User: pkendall Date: 01/08/08 18:18:28 Modified: src/main/org/jbossmq/pm PersistenceManager.java TxManager.java Added: src/main/org/jbossmq/pm Tx.java Log: Major updates (especially to topics). Speed improvements. Make JVM IL work (by using a singleton JMSServer). Message Listeners re-implemented using client-side thread. Revision Changes Path 1.4 +6 -6 jbossmq/src/main/org/jbossmq/pm/PersistenceManager.java Index: PersistenceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/PersistenceManager.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- PersistenceManager.java 2001/07/28 00:33:38 1.3 +++ PersistenceManager.java 2001/08/09 01:18:28 1.4 @@ -25,7 +25,7 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author Paul Kendall ([EMAIL PROTECTED]) * - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public interface PersistenceManager { @@ -53,17 +53,17 @@ /** * Create and return a unique transaction id. */ - public abstract Long createPersistentTx() throws javax.jms.JMSException; + public abstract Tx createPersistentTx() throws javax.jms.JMSException; /** * Commit the transaction to the persistent store. */ - public abstract void commitPersistentTx(Long txId) throws javax.jms.JMSException; + public abstract void commitPersistentTx(Tx txId) throws javax.jms.JMSException; /** * Rollback the transaction. */ - public abstract void rollbackPersistentTx(Long txId) throws javax.jms.JMSException; + public abstract void rollbackPersistentTx(Tx txId) throws javax.jms.JMSException; @@ -84,7 +84,7 @@ * Remove message from the persistent store. * If the message is part of a transaction, txId is not null. */ - public abstract void add(SpyMessage message, Long txId) throws javax.jms.JMSException; + public abstract void add(SpyMessage message, Tx txId) throws javax.jms.JMSException; /** * Remove the queue, and all messages in it, from the persistent store @@ -100,5 +100,5 @@ * Remove message from the persistent store. * If the message is part of a transaction, txId is not null. */ - public abstract void remove(SpyMessage message, Long txId) throws javax.jms.JMSException; + public abstract void remove(SpyMessage message, Tx txId) throws javax.jms.JMSException; } 1.4 +66 -60 jbossmq/src/main/org/jbossmq/pm/TxManager.java Index: TxManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/TxManager.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- TxManager.java 2001/07/28 00:33:38 1.3 +++ TxManager.java 2001/08/09 01:18:28 1.4 @@ -15,8 +15,8 @@ import org.jbossmq.xml.XElement; import org.jbossmq.SpyMessage; import org.jbossmq.SpyDestination; +import org.jbossmq.SpyJMSException; - import org.jbossmq.ConnectionToken; /** @@ -25,7 +25,7 @@ * @author Hiram Chirino ([EMAIL PROTECTED]) * @author Paul Kendall ([EMAIL PROTECTED]) * - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public class TxManager { @@ -36,6 +36,30 @@ // Maps Global transactions to local transactions HashMap globalToLocal = new HashMap(); + protected static int MAX_POOL_SIZE = 500; + //pool of linked lists to use for storing txs tasks + java.util.ArrayList listPool = new java.util.ArrayList(); + + protected LinkedList getList(){ + synchronized(listPool){ + if(listPool.isEmpty()){ + return new LinkedList(); + }else{ + return (LinkedList)listPool.remove(listPool.size()-1); + } + } + } + + protected void releaseList(LinkedList list){ + synchronized(listPool){ + if(listPool.size() < MAX_POOL_SIZE){ + list.clear(); + listPool.add(list); + } + } + } + + class GlobalXID implements Runnable { ConnectionToken dc; Object xid; @@ -67,42 +91,41 @@ /** * Create and return a unique transaction id. */ - public final Long createTx() throws javax.jms.JMSException { - Long txId = persistenceManager.createPersistentTx(); + public final Tx createTx() throws javax.jms.JMSException { + Tx txId = persistenceManager.createPersistentTx(); synchronized (postCommitTasks) { - postCommitTasks.put(txId, new LinkedList()); - postRollbackTasks.put(txId, new LinkedList()); + postCommitTasks.put(txId, getList()); + postRollbackTasks.put(txId, getList()); } return txId; } - - /** * Commit the transaction to the persistent store. */ - public final void commitTx(Long txId) throws javax.jms.JMSException { + public final void commitTx(Tx txId) throws javax.jms.JMSException { LinkedList tasks; synchronized( postCommitTasks ) { tasks = (LinkedList)postCommitTasks.remove(txId); - postRollbackTasks.remove(txId); + releaseList((LinkedList)postRollbackTasks.remove(txId)); } if( tasks == null ) throw new javax.jms.JMSException("Transaction is not active for commit."); - persistenceManager.commitPersistentTx(txId); + persistenceManager.commitPersistentTx(txId); - synchronized(tasks){ - Iterator iter = tasks.iterator(); - while( iter.hasNext() ) { - Runnable task = (Runnable)iter.next(); - task.run(); - } - } - } + synchronized(tasks){ + Iterator iter = tasks.iterator(); + while( iter.hasNext() ) { + Runnable task = (Runnable)iter.next(); + task.run(); + } + } + releaseList(tasks); + } - public final void addPostCommitTask(Long txId, Runnable task) throws javax.jms.JMSException { + public final void addPostCommitTask(Tx txId, Runnable task) throws javax.jms.JMSException { if( txId == null ) { task.run(); @@ -124,29 +147,30 @@ /** * Rollback the transaction. */ - public final void rollbackTx(Long txId) throws javax.jms.JMSException { + public final void rollbackTx(Tx txId) throws javax.jms.JMSException { LinkedList tasks; synchronized( postCommitTasks ) { tasks = (LinkedList)postRollbackTasks.remove(txId); - postCommitTasks.remove(txId); - } + releaseList((LinkedList)postCommitTasks.remove(txId)); + } if( tasks == null ) throw new javax.jms.JMSException("Transaction is not active 3."); - persistenceManager.rollbackPersistentTx(txId); + persistenceManager.rollbackPersistentTx(txId); - synchronized(tasks){ - Iterator iter = tasks.iterator(); - while( iter.hasNext() ) { - Runnable task = (Runnable)iter.next(); - task.run(); - } - } + synchronized(tasks){ + Iterator iter = tasks.iterator(); + while( iter.hasNext() ) { + Runnable task = (Runnable)iter.next(); + task.run(); + } + } - } + releaseList(tasks); + } - public final void addPostRollbackTask(Long txId, Runnable task) throws javax.jms.JMSException { + public final void addPostRollbackTask(Tx txId, Runnable task) throws javax.jms.JMSException { if( txId == null ) { return; @@ -164,24 +188,6 @@ } - - - - - - - - - - - - - - - - - - PersistenceManager persistenceManager; public TxManager( PersistenceManager pm ) { @@ -195,15 +201,15 @@ * allocate a unique local transaction id if the remote id is not already * known. */ - public final Long createTx(ConnectionToken dc, Object xid) throws javax.jms.JMSException { + public final Tx createTx(ConnectionToken dc, Object xid) throws javax.jms.JMSException { GlobalXID gxid = new GlobalXID(dc, xid); synchronized(globalToLocal){ if( globalToLocal.containsKey(gxid) ) - throw new JMSException("Duplicate transaction from: "+dc.getClientID()+" xid="+xid); + throw new SpyJMSException("Duplicate transaction from: "+dc.getClientID()+" xid="+xid); } - Long txId = createTx(); + Tx txId = createTx(); synchronized(globalToLocal){ globalToLocal.put(gxid, txId); } @@ -213,21 +219,21 @@ addPostRollbackTask(txId, gxid); return txId; - } + } /** * Return the local transaction id for a distributed transaction id. */ - public final Long getPrepared(ConnectionToken dc, Object xid) throws javax.jms.JMSException { + public final Tx getPrepared(ConnectionToken dc, Object xid) throws javax.jms.JMSException { GlobalXID gxid = new GlobalXID(dc, xid); - Long txid; + Tx txid; synchronized(globalToLocal){ - txid = (Long)globalToLocal.get(gxid); + txid = (Tx)globalToLocal.get(gxid); } if( txid == null ) - throw new JMSException("Transaction does not exist from: "+dc.getClientID()+" xid="+xid); + throw new SpyJMSException("Transaction does not exist from: "+dc.getClientID()+" xid="+xid); return txid; - } + } } 1.1 jbossmq/src/main/org/jbossmq/pm/Tx.java Index: Tx.java =================================================================== package org.jbossmq.pm; public class Tx implements Comparable, java.io.Serializable, java.io.Externalizable{ long value = 0; public Tx() { } public Tx(long value) { this.value = value; } public void setValue(long tx){ value = tx; } public int hashCode() { return (int)(value ^ (value >> 32)); } public int compareTo(Tx anotherLong) { long thisVal = this.value; long anotherVal = anotherLong.value; return (thisVal<anotherVal ? -1 : (thisVal==anotherVal ? 0 : 1)); } public int compareTo(Object o) { return compareTo((Tx)o); } public void readExternal(java.io.ObjectInput in) throws java.io.IOException{ value = in.readLong(); } public void writeExternal(java.io.ObjectOutput out) throws java.io.IOException{ out.writeLong(value); } public long longValue(){ return value; } public String toString(){ return ""+value; } } _______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] http://lists.sourceforge.net/lists/listinfo/jboss-development