User: chirino 
  Date: 01/08/30 21:39:08

  Modified:    src/main/org/jboss/mq/pm/jdbc PersistenceManager.java
                        MessageLog.java
  Log:
  This PM was trying to get JNDI to a DataSource too soon.  Moved to when the PM is 
started.
  Still need to test.
  
  Revision  Changes    Path
  1.4       +269 -301  jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- PersistenceManager.java   2001/08/30 02:35:55     1.3
  +++ PersistenceManager.java   2001/08/31 04:39:08     1.4
  @@ -5,341 +5,309 @@
    * See terms of license at gnu.org.
    */
   package org.jboss.mq.pm.jdbc;
  -import java.io.*;
  +
  +import javax.rmi.PortableRemoteObject;
  +import javax.jms.JMSException;
  +import javax.sql.*;
  +import javax.naming.*;
  +import javax.management.*;
  +import javax.naming.InitialContext;
   
   import java.net.URL;
  -import java.sql.*;
   import java.util.HashMap;
  +import java.util.TreeSet;
   import java.util.Iterator;
   import java.util.LinkedList;
   import java.util.Properties;
  -import java.util.TreeSet;
  -import javax.jms.JMSException;
  -
  -import javax.management.*;
  -import javax.naming.*;
  -
  -import javax.naming.InitialContext;
  -
  -import javax.rmi.PortableRemoteObject;
  -import javax.sql.*;
  -import org.jboss.mq.ConnectionToken;
  +import java.sql.*;
  +import java.io.*;
   
  +import org.jboss.util.ServiceMBeanSupport;
   import org.jboss.mq.SpyDestination;
  -import org.jboss.mq.SpyJMSException;
  -import org.jboss.mq.SpyMessage;
  -import org.jboss.mq.pm.TxManager;
  +import org.jboss.mq.xml.XElement;
  +import org.jboss.mq.ConnectionToken;
   import org.jboss.mq.server.JMSDestination;
   import org.jboss.mq.server.JMSServer;
  -import org.jboss.mq.xml.XElement;
  -
  -import org.jboss.system.ServiceMBeanSupport;
  +import org.jboss.mq.pm.TxManager;
  +import org.jboss.mq.SpyMessage;
  +import org.jboss.mq.SpyJMSException;
   
   /**
    *  This class manages all persistence related services for file based
    *  persistence.
    *
  - * @created    August 16, 2001
  - * @author:    Jayesh Parayali ([EMAIL PROTECTED])
  - * @version    $Revision: 1.3 $
  + * @author: Jayesh Parayali ([EMAIL PROTECTED])
  + *
  + *  @version $Revision: 1.4 $
    */
  -public class PersistenceManager extends org.jboss.system.ServiceMBeanSupport 
implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager {
  +public class PersistenceManager extends org.jboss.util.ServiceMBeanSupport 
implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager {
  +
  +   protected static DataSource datasource;
   
      // Log file used to store commited transactions.
  -   TxLog            txLog;
  +   TxLog txLog;
      // Maps SpyDestinations to SpyMessageLogs
  -   HashMap          messageLogs = new HashMap();
  +   HashMap messageLogs= new HashMap();
      // Maps (Long)txIds to LinkedList of AddFile tasks
  -   HashMap          transactedTasks = new HashMap();
  -   TxManager        txManager;
  +   HashMap transactedTasks= new HashMap();
   
  -   private String   jmsDBPoolName;
  -
  -   protected static DataSource datasource;
  -
  +   static class LogInfo {
  +       MessageLog log;
  +       SpyDestination destination;
   
  -   /**
  -    *  Insert the method's description here. Creation date: (6/27/2001 1:07:07
  -    *  AM)
  -    *
  -    * @param  newJmsDBPoolName  java.lang.String
  -    */
  -   public void setJmsDBPoolName( java.lang.String newJmsDBPoolName ) {
  -      jmsDBPoolName = newJmsDBPoolName;
  +       LogInfo(MessageLog log, SpyDestination destination) {
  +              this.log= log;
  +              this.destination= destination;
  +       }
      }
   
  -   /**
  -    *  Insert the method's description here. Creation date: (6/27/2001 1:07:07
  -    *  AM)
  -    *
  -    * @return    java.lang.String
  -    */
  -   public java.lang.String getJmsDBPoolName() {
  -      return jmsDBPoolName;
  +   class Transaction {
  +       private LogInfo logInfo;
  +       private SpyMessage message;
  +       private org.jboss.mq.pm.Tx txId;
  +       private boolean add;
  +       public Transaction(boolean add, LogInfo logInfo, SpyMessage message, 
org.jboss.mq.pm.Tx txId) {
  +              this.add= add;
  +              this.logInfo= logInfo;
  +              this.message= message;
  +              this.txId= txId;
  +       }
  +       public void commit() throws JMSException {
  +              if (!add)
  +                     logInfo.log.remove(message, txId);
  +       }
  +       public void rollback() throws JMSException {
  +              if (add)
  +                     logInfo.log.remove(message, txId);
  +       }
      }
   
  -   public String getName() {
  -      return "JBossMQ-PersistenceManager";
  -   }
  +   private String jmsDBPoolName;
  +   TxManager txManager;
   
      /**
  -    *  getTxManager method comment.
  -    *
  -    * @return    The TxManager value
  -    */
  -   public org.jboss.mq.pm.TxManager getTxManager() {
  -      return txManager;
  -   }
  -
  -
  -   public void initService()
  -      throws Exception {
  -
  -      //Get an InitialContext
  -      InitialContext ctx = new InitialContext();
  -      datasource = ( DataSource )ctx.lookup( jmsDBPoolName );
  -      txLog = new TxLog( datasource );
  -
  -      JMSServer server = ( JMSServer )getServer().invoke( new ObjectName( 
org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME ), "getJMSServer", new Object[]{}, 
new String[]{} );
  -      server.setPersistenceManager( this );
  -
  -   }
  -
  -   public void startService()
  -      throws Exception {
  -
  -      JMSServer server = ( JMSServer )getServer().invoke( new ObjectName( 
org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME ), "getJMSServer", new Object[]{}, 
new String[]{} );
  -      restore( server );
  -
  -   }
  -
  -
  -   public void destroyQueue( SpyDestination dest )
  -      throws javax.jms.JMSException {
  -
  -      try {
  -
  -         //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
  -         //java.io.File file = new java.io.File(logDir.getFile());
  -
  -         LogInfo logInfo;
  -         synchronized ( messageLogs ) {
  -            logInfo = ( LogInfo )messageLogs.remove( "" + dest );
  -         }
  -         if ( logInfo == null ) {
  -            throw new SpyJMSException( "The persistence log was never initialized" 
);
  -         }
  -
  -         logInfo.log.close();
  -         //file.delete();
  -
  -      } catch ( javax.jms.JMSException e ) {
  -         throw e;
  -      } catch ( Exception e ) {
  -         javax.jms.JMSException newE = new javax.jms.JMSException( "Invalid 
configuration." );
  -         newE.setLinkedException( e );
  -         throw newE;
  -      }
  -
  -   }
  -
  -   public void initQueue( SpyDestination dest )
  -      throws javax.jms.JMSException {
  -      try {
  -         //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
  -
  -         MessageLog log = new MessageLog( datasource, dest.toString() );
  -
  -         LogInfo info = new LogInfo( log, dest );
  -
  -         synchronized ( messageLogs ) {
  -            messageLogs.put( "" + dest, info );
  -         }
  -
  -      } catch ( javax.jms.JMSException e ) {
  -         throw e;
  -      } catch ( Exception e ) {
  -         javax.jms.JMSException newE = new javax.jms.JMSException( "Invalid 
configuration." );
  -         newE.setLinkedException( e );
  -         throw newE;
  -      }
  -
  -   }
  -
  -
  -   public void add( org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId )
  -      throws javax.jms.JMSException {
  -      LogInfo logInfo;
  -
  -      synchronized ( messageLogs ) {
  -         logInfo = ( LogInfo )messageLogs.get( "" + message.getJMSDestination() );
  -      }
  -
  -      if ( logInfo == null ) {
  -         throw new javax.jms.JMSException( "Destination was not initalized with the 
PersistenceManager" );
  -      }
  -
  -      logInfo.log.add( message, txId );
  -
  -      if ( txId != null ) {
  -         LinkedList tasks;
  -         synchronized ( transactedTasks ) {
  -            tasks = ( LinkedList )transactedTasks.get( txId );
  -         }
  -         if ( tasks == null ) {
  -            throw new javax.jms.JMSException( "Transaction is not active 5." );
  -         }
  -         synchronized ( tasks ) {
  -            tasks.addLast( new Transaction( true, logInfo, message, txId ) );
  -         }
  -      }
  -   }
  -
  -   public void commitPersistentTx( org.jboss.mq.pm.Tx txId )
  -      throws javax.jms.JMSException {
  -
  -      LinkedList transacted;
  -      synchronized ( transactedTasks ) {
  -         transacted = ( LinkedList )transactedTasks.remove( txId );
  -      }
  -      synchronized ( transacted ) {
  -         Iterator iter = transacted.iterator();
  -         while ( iter.hasNext() ) {
  -            Transaction task = ( Transaction )iter.next();
  -            task.commit();
  -         }
  -      }
  -
  -      txLog.commitTx( txId );
  -   }
  -
  -   public org.jboss.mq.pm.Tx createPersistentTx()
  -      throws javax.jms.JMSException {
  -      org.jboss.mq.pm.Tx txId = txLog.createTx();
  -      synchronized ( transactedTasks ) {
  -         transactedTasks.put( txId, new LinkedList() );
  -      }
  -      return txId;
  -   }
  -
  -   public void remove( org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId )
  -      throws javax.jms.JMSException {
  -      LogInfo logInfo;
  -
  -      synchronized ( messageLogs ) {
  -         logInfo = ( LogInfo )messageLogs.get( "" + message.getJMSDestination() );
  -      }
  -
  -      if ( logInfo == null ) {
  -         throw new javax.jms.JMSException( "Destination was not initalized with the 
PersistenceManager" );
  -      }
  -
  -      if ( txId == null ) {
  -         logInfo.log.remove( message, txId );
  -      } else {
  -         LinkedList tasks;
  -         synchronized ( transactedTasks ) {
  -            tasks = ( LinkedList )transactedTasks.get( txId );
  -         }
  -         if ( tasks == null ) {
  -            throw new javax.jms.JMSException( "Transaction is not active 6." );
  -         }
  -         synchronized ( tasks ) {
  -            tasks.addLast( new Transaction( false, logInfo, message, txId ) );
  -         }
  -      }
  -
  -   }
  -
  -   public void restore( org.jboss.mq.server.JMSServer server )
  -      throws javax.jms.JMSException {
  -
  -      TreeSet committingTXs = txLog.restore();
  -      HashMap clone;
  -      synchronized ( messageLogs ) {
  -         clone = ( HashMap )messageLogs.clone();
  -      }
  -
  -      Iterator iter = clone.values().iterator();
  -      while ( iter.hasNext() ) {
  -
  -         LogInfo logInfo = ( LogInfo )iter.next();
  -
  -         JMSDestination q = server.getJMSDestination( logInfo.destination );
  -
  -         SpyMessage rebuild[] = logInfo.log.restore( committingTXs, q.toString() );
  -
  -         //TODO: make sure this lock is good enough
  -         synchronized ( q ) {
  -            for ( int i = 0; i < rebuild.length; i++ ) {
  -               q.restoreMessage( rebuild[i] );
  -            }
  -         }
  -      }
  -
  -   }
  -
  -   public void rollbackPersistentTx( org.jboss.mq.pm.Tx txId )
  -      throws javax.jms.JMSException {
  +     * Insert the method's description here.
  +     * Creation date: (6/27/2001 1:07:07 AM)
  +     * @return java.lang.String
  +     */
  +   public java.lang.String getJmsDBPoolName() {
  +       return jmsDBPoolName;
  +   }   
   
  -      LinkedList transacted;
  -      synchronized ( transactedTasks ) {
  -         transacted = ( LinkedList )transactedTasks.remove( txId );
  -      }
  -      synchronized ( transacted ) {
  -         Iterator iter = transacted.iterator();
  -         while ( iter.hasNext() ) {
  -            Transaction task = ( Transaction )iter.next();
  -            task.rollback();
  -         }
  -      }
  +   public String getName() {
  +       return "JBossMQ-PersistenceManager";
  +   }   
   
  -      txLog.rollbackTx( txId );
  -   }
  +   public void initService() throws Exception {
   
  +       JMSServer server= (JMSServer) getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {
  +       }, new String[] {
  +       });
  +       server.setPersistenceManager(this);
   
  +   }   
   
      /**
  -    * @created    August 16, 2001
  -    */
  -   class Transaction {
  -      private LogInfo logInfo;
  -      private SpyMessage message;
  -      private org.jboss.mq.pm.Tx txId;
  -      private boolean add;
  -
  -      public Transaction( boolean add, LogInfo logInfo, SpyMessage message, 
org.jboss.mq.pm.Tx txId ) {
  -         this.add = add;
  -         this.logInfo = logInfo;
  -         this.message = message;
  -         this.txId = txId;
  -      }
  -
  -      public void commit()
  -         throws JMSException {
  -         if ( !add ) {
  -            logInfo.log.remove( message, txId );
  -         }
  -      }
  -
  -      public void rollback()
  -         throws JMSException {
  -         if ( add ) {
  -            logInfo.log.remove( message, txId );
  -         }
  -      }
  -   }
  +     * Insert the method's description here.
  +     * Creation date: (6/27/2001 1:07:07 AM)
  +     * @param newJmsDBPoolName java.lang.String
  +     */
  +   public void setJmsDBPoolName(java.lang.String newJmsDBPoolName) {
  +       jmsDBPoolName= newJmsDBPoolName;
  +   }   
  +
  +   public void startService() throws Exception {
  +
  +       //Get an InitialContext
  +       InitialContext ctx= new InitialContext();
  +       datasource= (DataSource) ctx.lookup(jmsDBPoolName);
  +       txLog= new TxLog(datasource);
  +
  +       Iterator i= messageLogs.values().iterator();
  +       while (i.hasNext()) {
  +              LogInfo li= (LogInfo) i.next();
  +              li.log.setDatasource(datasource);
  +       }
  +
  +       JMSServer server= (JMSServer) getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {
  +       }, new String[] {
  +       });
  +       restore(server);
  +
  +   }   
  +
  +   public void destroyQueue(SpyDestination dest) throws javax.jms.JMSException {
  +
  +       try {
  +
  +              //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
  +              //java.io.File file = new java.io.File(logDir.getFile());
  +
  +              LogInfo logInfo;
  +              synchronized (messageLogs) {
  +                     logInfo= (LogInfo) messageLogs.remove("" + dest);
  +              }
  +              if (logInfo == null)
  +                     throw new SpyJMSException("The persistence log was never 
initialized");
  +
  +              logInfo.log.close();
  +              //file.delete();
  +
  +       } catch (javax.jms.JMSException e) {
  +              throw e;
  +       } catch (Exception e) {
  +              javax.jms.JMSException newE= new javax.jms.JMSException("Invalid 
configuration.");
  +              newE.setLinkedException(e);
  +              throw newE;
  +       }
  +
  +   }   
  +
  +   public void initQueue(SpyDestination dest) throws javax.jms.JMSException {
  +       try {
  +
  +              MessageLog log= new MessageLog(dest.toString());
  +              LogInfo info= new LogInfo(log, dest);
  +
  +              synchronized (messageLogs) {
  +                     messageLogs.put("" + dest, info);
  +              }
  +
  +       } catch (javax.jms.JMSException e) {
  +              throw e;
  +       } catch (Exception e) {
  +              javax.jms.JMSException newE= new javax.jms.JMSException("Invalid 
configuration.");
  +              newE.setLinkedException(e);
  +              throw newE;
  +       }
  +
  +   }   
  +
  +   public void add(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException {
  +       LogInfo logInfo;
  +
  +       synchronized (messageLogs) {
  +              logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination());
  +       }
  +
  +       if (logInfo == null)
  +              throw new javax.jms.JMSException("Destination was not initalized with 
the PersistenceManager");
  +
  +       logInfo.log.add(message, txId);
  +
  +       if (txId != null) {
  +              LinkedList tasks;
  +              synchronized (transactedTasks) {
  +                     tasks= (LinkedList) transactedTasks.get(txId);
  +              }
  +              if (tasks == null)
  +                     throw new javax.jms.JMSException("Transaction is not active 
5.");
  +              synchronized (tasks) {
  +                     tasks.addLast(new Transaction(true, logInfo, message, txId));
  +              }
  +       }
  +
  +   }   
  +
  +   public void commitPersistentTx(org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException {
  +
  +       LinkedList transacted;
  +       synchronized (transactedTasks) {
  +              transacted= (LinkedList) transactedTasks.remove(txId);
  +       }
  +       synchronized (transacted) {
  +              Iterator iter= transacted.iterator();
  +              while (iter.hasNext()) {
  +                     Transaction task= (Transaction) iter.next();
  +                     task.commit();
  +              }
  +       }
  +
  +       txLog.commitTx(txId);
  +   }   
  +
  +   public org.jboss.mq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
  +       org.jboss.mq.pm.Tx txId= txLog.createTx();
  +       synchronized (transactedTasks) {
  +              transactedTasks.put(txId, new LinkedList());
  +       }
  +       return txId;
  +   }   
   
      /**
  -    * @created    August 16, 2001
  -    */
  -   static class LogInfo {
  -      MessageLog    log;
  -      SpyDestination destination;
  +     * getTxManager method comment.
  +     */
  +   public org.jboss.mq.pm.TxManager getTxManager() {
  +       return txManager;
  +   }   
   
  -      LogInfo( MessageLog log, SpyDestination destination ) {
  -         this.log = log;
  -         this.destination = destination;
  -      }
  -   }
  -}
  +   public void remove(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) 
throws javax.jms.JMSException {
  +       LogInfo logInfo;
  +
  +       synchronized (messageLogs) {
  +              logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination());
  +       }
  +
  +       if (logInfo == null)
  +              throw new javax.jms.JMSException("Destination was not initalized with 
the PersistenceManager");
  +
  +       if (txId == null)
  +              logInfo.log.remove(message, txId);
  +       else {
  +              LinkedList tasks;
  +              synchronized (transactedTasks) {
  +                     tasks= (LinkedList) transactedTasks.get(txId);
  +              }
  +              if (tasks == null)
  +                     throw new javax.jms.JMSException("Transaction is not active 
6.");
  +              synchronized (tasks) {
  +                     tasks.addLast(new Transaction(false, logInfo, message, txId));
  +              }
  +       }
  +
  +   }   
  +
  +   public void restore(org.jboss.mq.server.JMSServer server) throws 
javax.jms.JMSException {
  +
  +       TreeSet committingTXs= txLog.restore();
  +       HashMap clone;
  +       synchronized (messageLogs) {
  +              clone= (HashMap) messageLogs.clone();
  +       }
  +
  +       Iterator iter= clone.values().iterator();
  +       while (iter.hasNext()) {
  +
  +              LogInfo logInfo= (LogInfo) iter.next();
  +
  +              JMSDestination q= server.getJMSDestination(logInfo.destination);
  +
  +              SpyMessage rebuild[]= logInfo.log.restore(committingTXs, 
q.toString());
  +
  +              //TODO: make sure this lock is good enough
  +              synchronized (q) {
  +                     for (int i= 0; i < rebuild.length; i++) {
  +                        q.restoreMessage(rebuild[i]);
  +                     }
  +              }
  +       }
  +
  +   }   
  +
  +   public void rollbackPersistentTx(org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException {
  +
  +       LinkedList transacted;
  +       synchronized (transactedTasks) {
  +              transacted= (LinkedList) transactedTasks.remove(txId);
  +       }
  +       synchronized (transacted) {
  +              Iterator iter= transacted.iterator();
  +              while (iter.hasNext()) {
  +                     Transaction task= (Transaction) iter.next();
  +                     task.rollback();
  +              }
  +       }
  +
  +       txLog.rollbackTx(txId);
  +   }   
  +}
  \ No newline at end of file
  
  
  
  1.3       +154 -176  jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java
  
  Index: MessageLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- MessageLog.java   2001/08/17 03:04:05     1.2
  +++ MessageLog.java   2001/08/31 04:39:08     1.3
  @@ -5,31 +5,30 @@
    * See terms of license at gnu.org.
    */
   package org.jboss.mq.pm.jdbc;
  -import java.io.*;
  -import java.io.File;
  -import java.io.FileInputStream;
  -import java.io.FileOutputStream;
   
   import java.io.IOException;
  -import java.io.ObjectInputStream;
  -import java.io.ObjectOutputStream;
   import java.io.Serializable;
  +import java.io.FileOutputStream;
  +import java.io.ObjectOutputStream;
  +import java.io.FileInputStream;
  +import java.io.ObjectInputStream;
  +import java.io.File;
   import java.sql.*;
  +import java.io.*;
   
   import javax.jms.JMSException;
   import javax.sql.*;
   
   import org.jboss.mq.SpyDestination;
  -import org.jboss.mq.SpyJMSException;
   import org.jboss.mq.SpyMessage;
  +import org.jboss.mq.SpyJMSException;
   
   /**
  - *  This is used to keep SpyMessages on the disk and is used reconstruct the
  - *  queue in case of provider failure.
  + * This is used to keep SpyMessages on the disk and is used reconstruct the
  + * queue in case of provider failure.
    *
  - * @created    August 16, 2001
  - * @author:    Jayesh Parayali ([EMAIL PROTECTED])
  - * @version    $Revision: 1.2 $
  + * @author: Jayesh Parayali ([EMAIL PROTECTED])
  + * @version $Revision: 1.3 $
    */
   public class MessageLog {
   
  @@ -37,174 +36,153 @@
      // Attributes
      /////////////////////////////////////////////////////////////////////
      //private File queueName;
  -   protected static DataSource datasource;
  +   protected DataSource datasource;
   
      /////////////////////////////////////////////////////////////////////
  -   // Constructor
  +   // Public Methods
      /////////////////////////////////////////////////////////////////////
  -   public MessageLog( DataSource datasource, String dest )
  -      throws JMSException {
  -      if ( this.datasource == null ) {
  -         this.datasource = datasource;
  -      }
  -   }
  +   public void close() throws JMSException {
  +   }   
  +
  +   public SpyMessage[] restore(java.util.TreeSet comittingTXs, String dest) throws 
JMSException {
  +       String destin= dest.substring(21, dest.length());
   
  +       java.util.TreeMap messageIndex= new java.util.TreeMap();
  +       PreparedStatement pstmt= null;
  +       ResultSet rs= null;
  +       Connection con= null;
  +
  +       try {
  +              con= datasource.getConnection();
  +              pstmt= con.prepareStatement("select messageblob, messageid from 
jms_messages where destination = ?");
  +              pstmt.setString(1, destin);
  +              rs= pstmt.executeQuery();
  +
  +              while (rs.next()) {
  +                     byte[] st= (byte[]) rs.getObject(1);
  +                     ByteArrayInputStream baip= new ByteArrayInputStream(st);
  +                     ObjectInputStream ois= new ObjectInputStream(baip);
  +                     // re-create the object
  +                     SpyMessage message= (SpyMessage) ois.readObject();
  +
  +                     //Long msgId = new 
Long(Long.parseLong(rs.getString(2).trim(),16));
  +                     //restore the messageId which is not persistent.
  +                     message.messageId= Long.parseLong(rs.getString(2).trim(), 16);
  +                     Long msgId= new Long(message.messageId);
  +                     messageIndex.put(msgId, message);
  +              }
  +       } catch (SQLException e) {
  +              throwJMSException("SQL error while rebuilding the tranaction log.", 
e);
  +       } catch (Exception e) {
  +              throwJMSException("Could not rebuild the queue from the queue's 
tranaction log.", e);
  +       } finally {
  +              try {
  +                     if (rs != null)
  +                        rs.close();
  +                     if (pstmt != null)
  +                        pstmt.close();
  +                     if (con != null)
  +                        con.close();
  +              } catch (SQLException e) {
  +                     throwJMSException("SQL error while closing the database 
connection", e);
  +              }
  +       }
  +
  +       SpyMessage rc[]= new SpyMessage[messageIndex.size()];
  +       java.util.Iterator iter= messageIndex.values().iterator();
  +       for (int i= 0; iter.hasNext(); i++)
  +              rc[i]= (SpyMessage) iter.next();
  +       return rc;
  +   }   
  +
  +   private void throwJMSException(String message, Exception e) throws JMSException {
  +       JMSException newE= new SpyJMSException(message);
  +       newE.setLinkedException(e);
  +       throw newE;
  +   }   
   
      /////////////////////////////////////////////////////////////////////
  -   // Public Methods
  +   // Constructor
      /////////////////////////////////////////////////////////////////////
  -   public void close()
  -      throws JMSException {
  -   }
  -
  -
  -   public SpyMessage[] restore( java.util.TreeSet comittingTXs, String dest )
  -      throws JMSException {
  -      String destin = dest.substring( 21, dest.length() );
  -
  -      java.util.TreeMap messageIndex = new java.util.TreeMap();
  -      PreparedStatement pstmt = null;
  -      ResultSet rs = null;
  -      Connection con = null;
  -
  -      try {
  -         con = datasource.getConnection();
  -         pstmt =
  -               con.prepareStatement
  -               ( "select messageblob, messageid from jms_messages where destination 
= ?" );
  -         pstmt.setString( 1, destin );
  -         rs = pstmt.executeQuery();
  -
  -         while ( rs.next() ) {
  -            byte[] st = ( byte[] )rs.getObject( 1 );
  -            ByteArrayInputStream baip =
  -                  new ByteArrayInputStream( st );
  -            ObjectInputStream ois =
  -                  new ObjectInputStream( baip );
  -            // re-create the object
  -            SpyMessage message = ( SpyMessage )ois.readObject();
  -
  -            //Long msgId = new Long(Long.parseLong(rs.getString(2).trim(),16));
  -            //restore the messageId which is not persistent.
  -            message.messageId = Long.parseLong( rs.getString( 2 ).trim(), 16 );
  -            Long msgId = new Long( message.messageId );
  -            messageIndex.put( msgId, message );
  -         }
  -      } catch ( SQLException e ) {
  -         throwJMSException( "SQL error while rebuilding the tranaction log.", e );
  -      } catch ( Exception e ) {
  -         throwJMSException( "Could not rebuild the queue from the queue's 
tranaction log.", e );
  -      } finally {
  -         try {
  -            if ( rs != null ) {
  -               rs.close();
  -            }
  -            if ( pstmt != null ) {
  -               pstmt.close();
  -            }
  -            if ( con != null ) {
  -               con.close();
  -            }
  -         } catch ( SQLException e ) {
  -            throwJMSException( "SQL error while closing the database connection", e 
);
  -         }
  -      }
  -
  -      SpyMessage rc[] = new SpyMessage[messageIndex.size()];
  -      java.util.Iterator iter = messageIndex.values().iterator();
  -      for ( int i = 0; iter.hasNext(); i++ ) {
  -         rc[i] = ( SpyMessage )iter.next();
  -      }
  -      return rc;
  -   }
  -
  -   public void add( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
  -      throws JMSException {
  -      PreparedStatement pstmt = null;
  -      Connection con = null;
  -
  -      try {
  -         con = datasource.getConnection();
  -         ByteArrayOutputStream baos = new ByteArrayOutputStream();
  -         ObjectOutputStream oos = new ObjectOutputStream( baos );
  -         oos.writeObject( message );
  -         byte[] messageAsBytes = baos.toByteArray();
  -         pstmt =
  -               con.prepareStatement
  -               ( "insert into jms_messages (messageid, destination, messageblob) 
VALUES(?,?,?)" );
  -         ByteArrayInputStream bais =
  -               new ByteArrayInputStream( messageAsBytes );
  -         pstmt.setString( 2, ( ( SpyDestination )message.getJMSDestination() 
).getName() );
  -         pstmt.setBinaryStream( 3, bais, messageAsBytes.length );
  -         String hexString = null;
  -         if ( message.messageId <= 0 ) {
  -            hexString = "-" + Long.toHexString( ( -1 ) * message.messageId );
  -         } else {
  -            hexString = Long.toHexString( message.messageId );
  -         }
  -
  -         pstmt.setString( 1, hexString );
  -         pstmt.executeUpdate();
  -
  -         pstmt.close();
  -      } catch ( IOException e ) {
  -         throwJMSException( "Could serialize the message.", e );
  -      } catch ( SQLException e ) {
  -         throwJMSException( "Could not write message to the database.", e );
  -      } finally {
  -         try {
  -            //if (pstmt != null)
  -            //pstmt.close();
  -            if ( con != null ) {
  -               con.close();
  -            }
  -         } catch ( SQLException e ) {
  -            throwJMSException( "Could not close the database.", e );
  -         }
  -
  -      }
  -   }
  -
  -   public void remove( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
  -      throws JMSException {
  -      PreparedStatement pstmt = null;
  -      Connection con = null;
  -      try {
  -         con = datasource.getConnection();
  -         pstmt =
  -               con.prepareStatement
  -               ( "delete from jms_messages where messageid = ? and destination = ?" 
);
  -         String hexString = null;
  -         if ( message.messageId <= 0 ) {
  -            hexString = "-" + Long.toHexString( ( -1 ) * message.messageId );
  -         } else {
  -            hexString = Long.toHexString( message.messageId );
  -         }
  -         pstmt.setString( 1, hexString );
  -         pstmt.setString( 2, ( ( SpyDestination )message.getJMSDestination() 
).getName().trim() );
  -
  -         pstmt.execute();
  -      } catch ( SQLException e ) {
  -         throwJMSException( "Could not remove the message.", e );
  -      } finally {
  -         try {
  -            if ( pstmt != null ) {
  -               pstmt.close();
  -            }
  -            if ( con != null ) {
  -               con.close();
  -            }
  -         } catch ( SQLException e ) {
  -            throwJMSException( "Could not close the database.", e );
  -         }
  -
  -      }
  -   }
  -
  -   private void throwJMSException( String message, Exception e )
  -      throws JMSException {
  -      JMSException newE = new SpyJMSException( message );
  -      newE.setLinkedException( e );
  -      throw newE;
  -   }
  -}
  +   public MessageLog(String dest) throws JMSException {
  +   }   
  +
  +   public void add(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws 
JMSException {
  +       PreparedStatement pstmt= null;
  +       Connection con= null;
  +
  +       try {
  +              con= datasource.getConnection();
  +              ByteArrayOutputStream baos= new ByteArrayOutputStream();
  +              ObjectOutputStream oos= new ObjectOutputStream(baos);
  +              oos.writeObject(message);
  +              byte[] messageAsBytes= baos.toByteArray();
  +              pstmt= con.prepareStatement("insert into jms_messages (messageid, 
destination, messageblob) VALUES(?,?,?)");
  +              ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
  +              pstmt.setString(2, ((SpyDestination) 
message.getJMSDestination()).getName());
  +              pstmt.setBinaryStream(3, bais, messageAsBytes.length);
  +              String hexString= null;
  +              if (message.messageId <= 0)
  +                     hexString= "-" + Long.toHexString((-1) * message.messageId);
  +              else
  +                     hexString= Long.toHexString(message.messageId);
  +
  +              pstmt.setString(1, hexString);
  +              pstmt.executeUpdate();
  +
  +              pstmt.close();
  +       } catch (IOException e) {
  +              throwJMSException("Could serialize the message.", e);
  +       } catch (SQLException e) {
  +              throwJMSException("Could not write message to the database.", e);
  +       } finally {
  +              try {
  +                     //if (pstmt != null)
  +                     //pstmt.close();
  +                     if (con != null)
  +                        con.close();
  +              } catch (SQLException e) {
  +                     throwJMSException("Could not close the database.", e);
  +              }
  +
  +       }
  +   }   
  +
  +   public javax.sql.DataSource getDatasource() {
  +       return datasource;
  +   }   
  +
  +   public void remove(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws 
JMSException {
  +       PreparedStatement pstmt= null;
  +       Connection con= null;
  +       try {
  +              con= datasource.getConnection();
  +              pstmt= con.prepareStatement("delete from jms_messages where messageid 
= ? and destination = ?");
  +              String hexString= null;
  +              if (message.messageId <= 0)
  +                     hexString= "-" + Long.toHexString((-1) * message.messageId);
  +              else
  +                     hexString= Long.toHexString(message.messageId);
  +              pstmt.setString(1, hexString);
  +              pstmt.setString(2, ((SpyDestination) 
message.getJMSDestination()).getName().trim());
  +
  +              pstmt.execute();
  +       } catch (SQLException e) {
  +              throwJMSException("Could not remove the message.", e);
  +       } finally {
  +              try {
  +                     if (pstmt != null)
  +                        pstmt.close();
  +                     if (con != null)
  +                        con.close();
  +              } catch (SQLException e) {
  +                     throwJMSException("Could not close the database.", e);
  +              }
  +
  +       }
  +   }   
  +
  +   public void setDatasource(javax.sql.DataSource newDatasource) {
  +       datasource= newDatasource;
  +   }   
  +}
  \ No newline at end of file
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to