Author: mlovett
Date: Thu Nov 23 02:16:56 2006
New Revision: 478523

URL: http://svn.apache.org/viewvc?view=rev&rev=478523
Log:
Add locking to the in-memory storage manager, see SANDESHA2-49

Added:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
   (with props)
Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java
 Thu Nov 23 02:16:56 2006
@@ -19,6 +19,8 @@
 
 import java.io.Serializable;
 
+import org.apache.sandesha2.storage.Transaction;
+
 /**
  * Defines a data bean used and managed by Sandesha2.
  */
@@ -26,7 +28,7 @@
 public abstract class RMBean implements Serializable {
                
                private long id;
-               
+               private volatile Transaction transaction;
                
                public long getId() {
                        return id;
@@ -35,4 +37,11 @@
                protected void setId(long id) {
                        this.id = id;
                }
-}
\ No newline at end of file
+               public Transaction getTransaction() {
+                       return transaction;
+               }
+
+               public void setTransaction(Transaction transaction) {
+                       this.transaction = transaction;
+               }
+}

Added: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java?view=auto&rev=478523
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
 (added)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
 Thu Nov 23 02:16:56 2006
@@ -0,0 +1,148 @@
+package org.apache.sandesha2.storage.inmemory;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.axis2.context.AbstractContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.beans.RMBean;
+
+abstract class InMemoryBeanMgr {
+
+       private static final Log log = LogFactory.getLog(InMemoryBeanMgr.class);
+       private Hashtable table;
+       private InMemoryStorageManager mgr;
+
+       protected InMemoryBeanMgr(InMemoryStorageManager mgr, AbstractContext 
context, String key) {
+               this.mgr = mgr;
+               Object obj = context.getProperty(key);
+               if (obj != null) {
+                       table = (Hashtable) obj;
+               } else {
+                       table = new Hashtable();
+                       context.setProperty(key, table);
+               }
+       }
+       
+       protected boolean insert(Object key, RMBean bean) {
+               mgr.enlistBean(bean);
+               synchronized (table) {
+                       table.put(key, bean);
+               }
+               return true;
+       }
+
+       protected boolean delete(Object key) {
+               RMBean bean = null;
+               synchronized (table) {
+                       bean = (RMBean) table.get(key);
+               }
+               if(bean != null) {
+                       mgr.enlistBean(bean);
+                       synchronized (table) {
+                               bean = (RMBean) table.remove(key);
+                       }
+               }
+               return bean != null;
+       }
+
+       protected RMBean retrieve(Object key) {
+               RMBean bean = null;
+               synchronized (table) {
+                       bean = (RMBean) table.get(key);
+               }
+               if(bean != null) {
+                       mgr.enlistBean(bean);
+                       synchronized (table) {
+                               bean = (RMBean) table.get(key);
+                       }
+               }
+               return bean;
+       }
+
+       protected boolean update(Object key, RMBean bean) {
+               mgr.enlistBean(bean);
+               RMBean oldBean = null;
+               synchronized (table) {
+                       oldBean = (RMBean) table.get(key);
+                       table.put(key, bean);
+               }
+               if(oldBean == null) return false;
+               
+               mgr.enlistBean(oldBean);
+               return true;
+       }
+
+       protected List find(RMBean matchInfo) {
+               ArrayList beans = new ArrayList();
+               synchronized (table) {
+                       if(matchInfo == null) {
+                               beans.addAll(table.values());
+                       } else {
+                               Iterator i = table.values().iterator();
+                               while(i.hasNext()) {
+                                       RMBean candidate = (RMBean)i.next();
+                                       if(match(matchInfo, candidate)) {
+                                               beans.add(candidate);
+                                       }
+                               }
+                       }
+               }
+               
+               // Now we have a point-in-time view of the beans, lock them all
+               Iterator i = beans.iterator();
+               while(i.hasNext()) mgr.enlistBean((RMBean) i.next());
+               
+               // Finally remove any beans that are no longer in the table
+               synchronized (table) {
+                       i = beans.iterator();
+                       while(i.hasNext()) {
+                               RMBean bean = (RMBean) i.next();
+                               if(!table.containsValue(bean)) {
+                                       i.remove();
+                               }
+                       }
+               }
+
+               return beans;
+       }
+
+       protected RMBean findUnique (RMBean matchInfo) throws SandeshaException 
{
+               RMBean result = null;
+               synchronized (table) {
+                       Iterator i = table.values().iterator();
+                       while(i.hasNext()) {
+                               RMBean candidate = (RMBean)i.next();
+                               if(match(matchInfo, candidate)) {
+                                       if(result == null) {
+                                               result = candidate;
+                                       } else {
+                                               String message = 
SandeshaMessageHelper.getMessage(
+                                                               
SandeshaMessageKeys.nonUniqueResult);
+                                               log.error(message);
+                                               throw new SandeshaException 
(message);
+                                       }
+                               }
+                       }
+               }
+               
+               // Now we have a point-in-time view of the bean, lock it, and 
double
+               // check that it is still in the table 
+               if(result != null) {
+                       mgr.enlistBean(result);
+                       synchronized (table) {
+                               if(!table.containsValue(result)) result = null;
+                       }
+               }
+               
+               return result;
+       }
+
+       protected abstract boolean match(RMBean matchInfo, RMBean bean);
+}

Propchange: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
 Thu Nov 23 02:16:56 2006
@@ -18,10 +18,6 @@
 package org.apache.sandesha2.storage.inmemory;
 
 import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Hashtable;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.axis2.context.AbstractContext;
@@ -33,96 +29,61 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.RMBean;
 
-public class InMemoryCreateSeqBeanMgr implements CreateSeqBeanMgr {
+public class InMemoryCreateSeqBeanMgr extends InMemoryBeanMgr implements 
CreateSeqBeanMgr {
 
        private static final Log log = 
LogFactory.getLog(InMemoryCreateSeqBeanMgr.class);
-       private Hashtable table = null;
-       
 
-       public InMemoryCreateSeqBeanMgr(AbstractContext context) {
-               Object obj = 
context.getProperty(Sandesha2Constants.BeanMAPs.CREATE_SEQUECE);
-               if (obj != null) {
-                       table = (Hashtable) obj;
-               } else {
-                       table = new Hashtable();
-                       
context.setProperty(Sandesha2Constants.BeanMAPs.CREATE_SEQUECE, table);
-               }
+       public InMemoryCreateSeqBeanMgr(InMemoryStorageManager mgr, 
AbstractContext context) {
+               super(mgr, context, Sandesha2Constants.BeanMAPs.CREATE_SEQUECE);
        }
 
-       public synchronized boolean insert(CreateSeqBean bean) {
-               table.put(bean.getCreateSeqMsgID(), bean);
-               return true;
+       public boolean insert(CreateSeqBean bean) {
+               return super.insert(bean.getCreateSeqMsgID(), bean);
        }
 
-       public synchronized boolean delete(String msgId) {
-               return table.remove(msgId) != null;
+       public boolean delete(String msgId) {
+               return super.delete(msgId);
        }
 
-       public synchronized CreateSeqBean retrieve(String msgId) {
-               return (CreateSeqBean) table.get(msgId);
+       public CreateSeqBean retrieve(String msgId) {
+               return (CreateSeqBean) super.retrieve(msgId);
        }
 
-       public synchronized boolean update(CreateSeqBean bean) {
-               if (table.get(bean.getCreateSeqMsgID())==null)
-                       return false;
-
-               return table.put(bean.getCreateSeqMsgID(), bean) != null;
+       public boolean update(CreateSeqBean bean) {
+               return super.update(bean.getCreateSeqMsgID(), bean);
        }
 
-       public synchronized List find(CreateSeqBean bean) {
-               ArrayList beans = new ArrayList();
-               if (bean == null)
-                       return beans;
-
-               Iterator iterator = table.values().iterator();
-
-               CreateSeqBean temp;
-               while (iterator.hasNext()) {
-                       temp = (CreateSeqBean) iterator.next();
-
-                       boolean equal = true;
-
-                       if (bean.getCreateSeqMsgID() != null
-                                       && !bean.getCreateSeqMsgID().equals(
-                                                       
temp.getCreateSeqMsgID()))
-                               equal = false;
+       public List find(CreateSeqBean bean) {
+               return super.find(bean);
+       }
+       
+       protected boolean match(RMBean matchInfo, RMBean candidate) {
+               boolean equal = true;
+               
+               CreateSeqBean bean = (CreateSeqBean) matchInfo;
+               CreateSeqBean temp = (CreateSeqBean) candidate;
 
-                       if (bean.getSequenceID() != null
-                                       && 
!bean.getSequenceID().equals(temp.getSequenceID()))
-                               equal = false;
+               if (bean.getCreateSeqMsgID() != null
+                               && !bean.getCreateSeqMsgID().equals(
+                                               temp.getCreateSeqMsgID()))
+                       equal = false;
 
-                       if (bean.getInternalSequenceID() != null
-                                       && !bean.getInternalSequenceID().equals(
-                                                       
temp.getInternalSequenceID()))
-                               equal = false;
+               if (bean.getSequenceID() != null
+                               && 
!bean.getSequenceID().equals(temp.getSequenceID()))
+                       equal = false;
 
-                       if (equal)
-                               beans.add(temp);
+               if (bean.getInternalSequenceID() != null
+                               && !bean.getInternalSequenceID().equals(
+                                               temp.getInternalSequenceID()))
+                       equal = false;
 
-               }
-               return beans;
+               return equal;
        }
 
-       public synchronized ResultSet find(String query) {
-               throw new 
UnsupportedOperationException(SandeshaMessageHelper.getMessage(
-                               SandeshaMessageKeys.selectRSNotSupported));
-       }
-       
-       public synchronized CreateSeqBean findUnique (CreateSeqBean bean) 
throws SandeshaException {
-               Collection coll = find(bean);
-               if (coll.size()>1) {
-                       String message = SandeshaMessageHelper.getMessage(
-                                       SandeshaMessageKeys.nonUniqueResult);
-                       log.error(message);
-                       throw new SandeshaException (message);
-               }
-               
-               Iterator iter = coll.iterator();
-               if (iter.hasNext())
-                       return (CreateSeqBean) iter.next();
-               else 
-                       return null;
+       public CreateSeqBean findUnique (CreateSeqBean bean) throws 
SandeshaException {
+               return (CreateSeqBean) super.findUnique(bean);
        }
 
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
 Thu Nov 23 02:16:56 2006
@@ -33,89 +33,60 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMBean;
 
-public class InMemoryInvokerBeanMgr implements InvokerBeanMgr {
+public class InMemoryInvokerBeanMgr extends InMemoryBeanMgr implements 
InvokerBeanMgr {
        
        private static final Log log = 
LogFactory.getLog(InMemoryInvokerBeanMgr.class);
-       private Hashtable table = null;
 
-       public InMemoryInvokerBeanMgr(AbstractContext context) {
-               Object obj = 
context.getProperty(Sandesha2Constants.BeanMAPs.STORAGE_MAP);
-               if (obj != null) {
-                       table = (Hashtable) obj;
-               } else {
-                       table = new Hashtable();
-                       
context.setProperty(Sandesha2Constants.BeanMAPs.STORAGE_MAP, table);
-               }
+       public InMemoryInvokerBeanMgr(InMemoryStorageManager mgr, 
AbstractContext context) {
+               super(mgr, context, Sandesha2Constants.BeanMAPs.STORAGE_MAP);
        }
 
-       public synchronized boolean insert(InvokerBean bean) {
-               table.put(bean.getMessageContextRefKey(), bean);
-               return true;
+       public boolean insert(InvokerBean bean) {
+               return super.insert(bean.getMessageContextRefKey(), bean);
        }
 
-       public synchronized boolean delete(String key) {
-               return table.remove(key) != null;
+       public boolean delete(String key) {
+               return super.delete(key);
        }
 
-       public synchronized InvokerBean retrieve(String key) {
-               return (InvokerBean) table.get(key);
+       public InvokerBean retrieve(String key) {
+               return (InvokerBean) super.retrieve(key);
        }
 
-       public synchronized ResultSet find(String query) {
-               throw new 
UnsupportedOperationException(SandeshaMessageHelper.getMessage(
-                               SandeshaMessageKeys.selectRSNotSupported));
+       public List find(InvokerBean bean) {
+               return super.find(bean);
        }
+       
+       protected boolean match(RMBean matchInfo, RMBean candidate) {
+               InvokerBean bean = (InvokerBean) matchInfo;
+               InvokerBean temp = (InvokerBean) candidate;
 
-       public synchronized List find(InvokerBean bean) {
-               ArrayList beans = new ArrayList();
-               Iterator iterator = table.values().iterator();
-
-               InvokerBean temp = new InvokerBean();
-               while (iterator.hasNext()) {
-                       temp = (InvokerBean) iterator.next();
-                       boolean select = true;
-
-                       if (bean.getMessageContextRefKey() != null && 
!bean.getMessageContextRefKey().equals(temp.getMessageContextRefKey()))
-                               select = false;
+               boolean select = true;
 
-                       if (bean.getMsgNo() != 0 && bean.getMsgNo() != 
temp.getMsgNo())
-                               select = false;
+               if (bean.getMessageContextRefKey() != null && 
!bean.getMessageContextRefKey().equals(temp.getMessageContextRefKey()))
+                       select = false;
 
-                       if (bean.getSequenceID() != null
-                                       && 
!bean.getSequenceID().equals(temp.getSequenceID()))
-                               select = false;
-                       
-                       if (bean.isInvoked()!=temp.isInvoked())
-                               select = false;
+               if (bean.getMsgNo() != 0 && bean.getMsgNo() != temp.getMsgNo())
+                       select = false;
 
-                       if (select)
-                               beans.add(temp);
-               }
-               return beans;
+               if (bean.getSequenceID() != null
+                               && 
!bean.getSequenceID().equals(temp.getSequenceID()))
+                       select = false;
+               
+               if (bean.isInvoked()!=temp.isInvoked())
+                       select = false;
+               
+               return select;
        }
 
-       public synchronized boolean update(InvokerBean bean) {
-               if (table.get(bean.getMessageContextRefKey())==null)
-                       return false;
-
-               return table.put(bean.getMessageContextRefKey(), bean) != null;
+       public boolean update(InvokerBean bean) {
+               return super.update(bean.getMessageContextRefKey(), bean);
        }
        
-       public synchronized InvokerBean findUnique (InvokerBean bean) throws 
SandeshaException {
-               Collection coll = find(bean);
-               if (coll.size()>1) {
-                       String message = SandeshaMessageHelper.getMessage(
-                                       SandeshaMessageKeys.nonUniqueResult);
-                       log.error(message);
-                       throw new SandeshaException (message);
-               }
-               
-               Iterator iter = coll.iterator();
-               if (iter.hasNext())
-                       return (InvokerBean) iter.next();
-               else 
-                       return null;
+       public InvokerBean findUnique(InvokerBean bean) throws 
SandeshaException {
+               return (InvokerBean) super.findUnique(bean);
        }
 
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
 Thu Nov 23 02:16:56 2006
@@ -33,94 +33,59 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.RMBean;
 
-public class InMemoryNextMsgBeanMgr implements NextMsgBeanMgr {
+public class InMemoryNextMsgBeanMgr extends InMemoryBeanMgr implements 
NextMsgBeanMgr {
 
        private static final Log log = 
LogFactory.getLog(InMemoryNextMsgBeanMgr.class);
-       private Hashtable table = null;
 
-       public InMemoryNextMsgBeanMgr(AbstractContext context) {
-               Object obj = 
context.getProperty(Sandesha2Constants.BeanMAPs.NEXT_MESSAGE);
-
-               if (obj != null) {
-                       table = (Hashtable) obj;
-               } else {
-                       table = new Hashtable();
-                       
context.setProperty(Sandesha2Constants.BeanMAPs.NEXT_MESSAGE, table);
-               }
+       public InMemoryNextMsgBeanMgr(InMemoryStorageManager mgr, 
AbstractContext context) {
+               super(mgr, context, Sandesha2Constants.BeanMAPs.NEXT_MESSAGE);
        }
 
-       public synchronized boolean delete(String sequenceId) {
-               return table.remove(sequenceId) != null;
+       public boolean delete(String sequenceId) {
+               return super.delete(sequenceId);
        }
 
-       public synchronized NextMsgBean retrieve(String sequenceId) {
-               return (NextMsgBean) table.get(sequenceId);
+       public NextMsgBean retrieve(String sequenceId) {
+               return (NextMsgBean) super.retrieve(sequenceId);
        }
 
-       public synchronized boolean insert(NextMsgBean bean) {
-               table.put(bean.getSequenceID(), bean);
-               return true;
+       public boolean insert(NextMsgBean bean) {
+               return super.insert(bean.getSequenceID(), bean);
        }
 
-       public synchronized ResultSet find(String query) {
-               throw new 
UnsupportedOperationException(SandeshaMessageHelper.getMessage(
-                               SandeshaMessageKeys.selectRSNotSupported));
+       public List find(NextMsgBean bean) {
+               return super.find(bean);
        }
+       
+       protected boolean match(RMBean matchInfo, RMBean candidate) {
+               NextMsgBean bean = (NextMsgBean) matchInfo;
+               NextMsgBean temp = (NextMsgBean) candidate;
 
-       public synchronized List find(NextMsgBean bean) {
-               ArrayList beans = new ArrayList();
-               Iterator iterator = table.values().iterator();
-
-               if (bean == null)
-                       return beans;
-
-               NextMsgBean temp;
-               while (iterator.hasNext()) {
-                       temp = (NextMsgBean) iterator.next();
-
-                       boolean equal = true;
-
-                       if (bean.getNextMsgNoToProcess() > 0
-                                       && bean.getNextMsgNoToProcess() != temp
-                                                       
.getNextMsgNoToProcess())
-                               equal = false;
+               boolean equal = true;
 
-                       if (bean.getSequenceID() != null
-                                       && 
!bean.getSequenceID().equals(temp.getSequenceID()))
-                               equal = false;
+               if (bean.getNextMsgNoToProcess() > 0
+                               && bean.getNextMsgNoToProcess() != temp
+                                               .getNextMsgNoToProcess())
+                       equal = false;
 
-                       if (equal)
-                               beans.add(temp);
+               if (bean.getSequenceID() != null
+                               && 
!bean.getSequenceID().equals(temp.getSequenceID()))
+                       equal = false;
 
-               }
-               return beans;
+               return equal;
        }
 
-       public synchronized boolean update(NextMsgBean bean) {
-               if (table.get(bean.getSequenceID())==null)
-                       return false;
-
-               return table.put(bean.getSequenceID(), bean) != null;
+       public boolean update(NextMsgBean bean) {
+               return super.update(bean.getSequenceID(), bean);
        }
 
-       public synchronized Collection retrieveAll() {
-               return table.values();
+       public Collection retrieveAll() {
+               return super.find(null);
        }
        
-       public synchronized NextMsgBean findUnique(NextMsgBean bean) throws 
SandeshaException {
-               Collection coll = find(bean);
-               if (coll.size()>1) {
-                       String message = SandeshaMessageHelper.getMessage(
-                                       SandeshaMessageKeys.nonUniqueResult);
-                       log.error(message);
-                       throw new SandeshaException (message);
-               }
-               
-               Iterator iter = coll.iterator();
-               if (iter.hasNext())
-                       return (NextMsgBean) iter.next();
-               else 
-                       return null;
+       public NextMsgBean findUnique(NextMsgBean bean) throws 
SandeshaException {
+               return (NextMsgBean) super.findUnique(bean);
        }
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 Thu Nov 23 02:16:56 2006
@@ -17,9 +17,6 @@
 
 package org.apache.sandesha2.storage.inmemory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 
@@ -31,218 +28,144 @@
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
-import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.SandeshaUtil;
 
-public class InMemorySenderBeanMgr implements SenderBeanMgr {
+public class InMemorySenderBeanMgr extends InMemoryBeanMgr implements 
SenderBeanMgr {
        
        private static final Log log = 
LogFactory.getLog(InMemorySenderBeanMgr.class);
-       private Hashtable table = null;
 
-       public InMemorySenderBeanMgr(AbstractContext context) {
-               Object obj = 
context.getProperty(Sandesha2Constants.BeanMAPs.RETRANSMITTER);
-               if (obj != null) {
-                       table = (Hashtable) obj;
-               } else {
-                       table = new Hashtable();
-                       
context.setProperty(Sandesha2Constants.BeanMAPs.RETRANSMITTER, table);
-               }
+       public InMemorySenderBeanMgr(InMemoryStorageManager mgr, 
AbstractContext context) {
+               super(mgr, context, Sandesha2Constants.BeanMAPs.RETRANSMITTER);
        }
 
-       public synchronized boolean delete(String MessageId) {
-               return table.remove(MessageId) != null;
+       public boolean delete(String MessageId) {
+               return super.delete(MessageId);
        }
 
-       public synchronized SenderBean retrieve(String MessageId) {
-               return (SenderBean) table.get(MessageId);
+       public SenderBean retrieve(String MessageId) {
+               return (SenderBean) super.retrieve(MessageId);
        }
 
-       public synchronized boolean insert(SenderBean bean) throws 
SandeshaStorageException {
+       public boolean insert(SenderBean bean) throws SandeshaStorageException {
                if (bean.getMessageID() == null)
                        throw new 
SandeshaStorageException(SandeshaMessageHelper.getMessage(
                                        SandeshaMessageKeys.nullMsgId));
-               table.put(bean.getMessageID(), bean);
-               return true;
+               return super.insert(bean.getMessageID(), bean);
        }
 
-       public synchronized List find(String internalSequenceID) {
+       public List find(String internalSequenceID) {
+               SenderBean temp = new SenderBean();
+               temp.setInternalSequenceID(internalSequenceID);
+               return super.find(temp);
+       }
+       
+       protected boolean match(RMBean matchInfo, RMBean candidate) {
+               if(log.isDebugEnabled()) log.debug("Entry: 
InMemorySenderBeanMgr::match");
+               SenderBean bean = (SenderBean)matchInfo;
+               SenderBean temp = (SenderBean) candidate;
                
-               ArrayList arrayList = new ArrayList ();
-               if (internalSequenceID==null || "".equals(internalSequenceID))
-                       return arrayList;
+               boolean add = true;
+
+               if (bean.getMessageContextRefKey() != null && 
!bean.getMessageContextRefKey().equals(temp.getMessageContextRefKey())) {
+                       log.debug("MessageContextRefKey didn't match");
+                       add = false;
+               }
+               // Time is a bit special - we match all the beans that should 
be sent
+               // before the moment in time that the match criteria give us.
+               if (bean.getTimeToSend() > 0
+                               && bean.getTimeToSend() < temp.getTimeToSend()) 
{
+                       log.debug("MessageContextRefKey didn't match");
+                       add = false;
+               }
                
-               Iterator iterator = table.keySet().iterator();
+               if (bean.getMessageID() != null
+                               && 
!bean.getMessageID().equals(temp.getMessageID())) {
+                       log.debug("MessageID didn't match");
+                       add = false;
+               }
                
-               while (iterator.hasNext()) {
-                       SenderBean senderBean = (SenderBean) 
table.get(iterator.next());
-                       if 
(internalSequenceID.equals(senderBean.getInternalSequenceID())) 
-                                       arrayList.add(senderBean);
+               if (bean.getInternalSequenceID() != null
+                               && !bean.getInternalSequenceID().equals("")
+                               && !bean.getInternalSequenceID().equals(
+                                               temp.getInternalSequenceID())) {
+                       log.debug("InternalSequenceID didn't match");
+                       add = false;
                }
                
-               return arrayList;
-       }
-
-       public synchronized List find(SenderBean bean) {
-               ArrayList beans = new ArrayList();
-               Iterator iterator = ((Hashtable) table).values().iterator();
-
-               SenderBean temp;
-               while (iterator.hasNext()) {
-
-                       temp = (SenderBean) iterator.next();
-
-                       
-                       boolean add = true;
-
-                       if (bean.getMessageContextRefKey() != null && 
!bean.getMessageContextRefKey().equals(temp.getMessageContextRefKey()))
-                               add = false;
-
-                       if (bean.getTimeToSend() > 0
-                                       && bean.getTimeToSend() != 
temp.getTimeToSend())
-                               add = false;
-
-                       if (bean.getMessageID() != null
-                                       && 
!bean.getMessageID().equals(temp.getMessageID()))
-                               add = false;
-
-                       if (bean.getInternalSequenceID() != null
-                                       && !bean.getInternalSequenceID().equals(
-                                                       
temp.getInternalSequenceID()))
-                               add = false;
-
-                       if (bean.getMessageNumber() > 0
-                                       && bean.getMessageNumber() != 
temp.getMessageNumber())
-                               add = false;
-
-                       if (bean.getMessageType() != 
Sandesha2Constants.MessageTypes.UNKNOWN
-                                       && bean.getMessageType() != 
temp.getMessageType())
-                               add = false;
-                       
-                       if (bean.isSend() != temp.isSend())
-                               add = false;
+               if (bean.getMessageNumber() > 0
+                               && bean.getMessageNumber() != 
temp.getMessageNumber()) {
+                       log.debug("MessageNumber didn't match");
+                       add = false;
+               }
+               
+               if (bean.getMessageType() != 
Sandesha2Constants.MessageTypes.UNKNOWN
+                               && bean.getMessageType() != 
temp.getMessageType()) {
+                       log.debug("MessageType didn't match");
+                       add = false;
+               }
 
-                       if (bean.isReSend() != temp.isReSend())
-                               add = false;
-                       
-                       if (add)
-                               beans.add(temp);
+               if (bean.isSend() != temp.isSend()) {
+                       log.debug("isSend didn't match");
+                       add = false;
                }
 
-               return beans;
+               // Do not use the isReSend flag to match messages, as it can 
stop us from
+               // detecting RM messages during 'getNextMsgToSend'
+               //if (bean.isReSend() != temp.isReSend()) {
+               //      log.debug("isReSend didn't match");
+               //      add = false;
+               //}
+
+               if(log.isDebugEnabled()) log.debug("Exit: 
InMemorySenderBeanMgr::match, " + add);
+               return add;
        }
 
-       public synchronized SenderBean getNextMsgToSend() {
-               
-               Iterator iterator = table.keySet().iterator();
+       public List find(SenderBean bean) {
+               return super.find(bean);
+       }
 
-               //TODO
-               //pick a random sequence out of the sequences to be sent.
-               
+       public SenderBean getNextMsgToSend() {
+               // Set up match criteria
+               SenderBean matcher = new SenderBean();
+               matcher.setSend(true);
+               matcher.setTimeToSend(System.currentTimeMillis());
                
-               long lowestAppMsgNo = 0;
-               while (iterator.hasNext()) {
-                       Object key = iterator.next();
-                       SenderBean temp = (SenderBean) table.get(key);
-                       if (temp.isSend()) {
-                               long timeToSend = temp.getTimeToSend();
-                               long timeNow = System.currentTimeMillis();
-                               if ((timeNow >= timeToSend)) {
-                                       if 
(temp.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION) {
-                                               long msgNo = 
temp.getMessageNumber();
-                                               if (lowestAppMsgNo==0) {
-                                                       lowestAppMsgNo=msgNo;
-                                               }else {
-                                                       if 
(msgNo<lowestAppMsgNo)
-                                                               lowestAppMsgNo 
= msgNo;
-                                               }
-                                       }
-                               }
-                       }
-               }
+               List matches = super.find(matcher);
                
-               iterator = table.keySet().iterator();   
-               while (iterator.hasNext()) {
-                       Object key = iterator.next();
-                       SenderBean temp = (SenderBean) table.get(key);
-
-                       if (temp.isSend()) {
-
-                               long timeToSend = temp.getTimeToSend();
-                               long timeNow = System.currentTimeMillis();
-                               if ((timeNow >= timeToSend)) {
-                                       boolean valid = false;
-                                       if 
(temp.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION) {
-                                               if 
(temp.getMessageNumber()==lowestAppMsgNo)
-                                                       valid = true;
-                                       }else {
-                                               valid = true;
-                                       }
-                                       
-                                       if (valid) {
-                                           updateNextSendingTime (temp);
-                                           return temp;
-                                       }
+               // We either return an application message or an RM message. If 
we find
+               // an application message first then we carry on through the 
list to be
+               // sure that we send the lowest app message avaliable. If we 
hit a RM
+               // message first then we are done.
+               SenderBean result = null;
+               Iterator i = matches.iterator();
+               while(i.hasNext()) {
+                       SenderBean bean = (SenderBean) i.next();
+                       if(bean.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION) {
+                               long number = bean.getMessageNumber();
+                               if(result == null || result.getMessageNumber() 
> number) {
+                                       result = bean;
                                }
+                       } else if(result == null) {
+                               result = bean;
+                               break;
                        }
                }
                
-               return null;
+               return result;
        }
-
-       private void updateNextSendingTime (SenderBean bean) {
-               
+       
+       public boolean update(SenderBean bean) {
+               return super.update(bean.getMessageID(), bean);
        }
        
-       private synchronized ArrayList findBeansWithMsgNo(ArrayList list, long 
msgNo) {
-               ArrayList beans = new ArrayList();
-
-               Iterator it = list.iterator();
-               while (it.hasNext()) {
-                       SenderBean bean = (SenderBean) it.next();
-                       if (bean.getMessageNumber() == msgNo)
-                               beans.add(bean);
-               }
-
-               return beans;
+       public SenderBean findUnique(SenderBean bean) throws SandeshaException {
+               return (SenderBean) super.findUnique(bean);
        }
 
-       public synchronized boolean update(SenderBean bean) {
-               if (table.get(bean.getMessageID())==null)
-                       return false;
-
-               return true; //No need to update. Being a reference does the 
job.
-       }
-       
-       public synchronized SenderBean findUnique(SenderBean bean) throws 
SandeshaException {
-               Collection coll = find(bean);
-               if (coll.size()>1) {
-                       String message = SandeshaMessageHelper.getMessage(
-                                       SandeshaMessageKeys.nonUniqueResult);
-                       log.error(message);
-                       throw new SandeshaException (message);
-               }
-               
-               Iterator iter = coll.iterator();
-               if (iter.hasNext())
-                       return (SenderBean) iter.next();
-               else 
-                       return null;
-       }
-
-       public synchronized SenderBean retrieveFromMessageRefKey(String 
messageContextRefKey) {
-               
-               Iterator iter = table.keySet().iterator();
-               while (iter.hasNext()) {
-                       Object key = iter.next();
-                       SenderBean bean = (SenderBean) table.get(key);
-                       if 
(bean.getMessageContextRefKey().equals(messageContextRefKey)) {
-                               return bean;
-                       }
-               }
-               
-               return null;
+       public SenderBean retrieveFromMessageRefKey(String 
messageContextRefKey) {
+               throw new UnsupportedOperationException("Deprecated method");
        }
        
        

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
 Thu Nov 23 02:16:56 2006
@@ -17,11 +17,7 @@
 
 package org.apache.sandesha2.storage.inmemory;
 
-import java.sql.ResultSet;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Hashtable;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.axis2.context.AbstractContext;
@@ -29,129 +25,76 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.RMBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 
-public class InMemorySequencePropertyBeanMgr implements 
SequencePropertyBeanMgr {
+public class InMemorySequencePropertyBeanMgr extends InMemoryBeanMgr 
implements SequencePropertyBeanMgr {
        
        private static final Log log = 
LogFactory.getLog(InMemorySequencePropertyBeanMgr.class);
-       private Hashtable table = null;
 
-       public InMemorySequencePropertyBeanMgr(AbstractContext context) {
-               Object obj = 
context.getProperty(Sandesha2Constants.BeanMAPs.SEQUENCE_PROPERTY);
-               if (obj != null) {
-                       table = (Hashtable) obj;
-               } else {
-                       table = new Hashtable();
-                       
context.setProperty(Sandesha2Constants.BeanMAPs.SEQUENCE_PROPERTY, table);
-               }
+       public InMemorySequencePropertyBeanMgr(InMemoryStorageManager mgr, 
AbstractContext context) {
+               super(mgr, context, 
Sandesha2Constants.BeanMAPs.SEQUENCE_PROPERTY);
        }
 
-       public synchronized boolean delete(String sequenceId, String name) {
-               
-               SequencePropertyBean bean = retrieve( sequenceId,name);
-                               
-               return table.remove(sequenceId + ":" + name) != null;
+       public boolean delete(String sequenceId, String name) {
+               return super.delete(getId(sequenceId, name));
        }
 
-       public synchronized SequencePropertyBean retrieve(String sequenceId, 
String name) {
-               return (SequencePropertyBean) table.get(sequenceId + ":" + 
name);
+       public SequencePropertyBean retrieve(String sequenceId, String name) {
+               return (SequencePropertyBean) super.retrieve(getId(sequenceId, 
name));
        }
 
-       public synchronized boolean insert(SequencePropertyBean bean) {
-               table.put(bean.getSequencePropertyKey() + ":" + bean.getName(), 
bean);
-               return true;
+       public boolean insert(SequencePropertyBean bean) {
+               return super.insert(getId(bean), bean);
        }
 
-       public synchronized ResultSet find(String query) {
-               throw new 
UnsupportedOperationException(SandeshaMessageHelper.getMessage(
-                               SandeshaMessageKeys.selectRSNotSupported));
+       public List find(SequencePropertyBean bean) {
+               return super.find(bean);
        }
+       
+       protected boolean match(RMBean matchInfo, RMBean candidate) {
+               SequencePropertyBean bean = (SequencePropertyBean) matchInfo;
+               SequencePropertyBean temp = (SequencePropertyBean) candidate;
 
-       public synchronized List find(SequencePropertyBean bean) {
-               ArrayList beans = new ArrayList();
-
-               if (bean == null)
-                       return beans;
-
-               Iterator iterator = table.values().iterator();
-               SequencePropertyBean temp;
-
-               while (iterator.hasNext()) {
-                       temp = (SequencePropertyBean) iterator.next();
-
-                       boolean equal = true;
-
-                       if (bean.getSequencePropertyKey() != null
-                                       && 
!bean.getSequencePropertyKey().equals(temp.getSequencePropertyKey()))
-                               equal = false;
+               boolean equal = true;
 
-                       if (bean.getName() != null
-                                       && 
!bean.getName().equals(temp.getName()))
-                               equal = false;
+               if (bean.getSequencePropertyKey() != null
+                               && 
!bean.getSequencePropertyKey().equals(temp.getSequencePropertyKey()))
+                       equal = false;
 
-                       if (bean.getValue() != null
-                                       && 
!bean.getValue().equals(temp.getValue()))
-                               equal = false;
+               if (bean.getName() != null
+                               && !bean.getName().equals(temp.getName()))
+                       equal = false;
 
-                       if (equal)
-                               beans.add(temp);
+               if (bean.getValue() != null
+                               && !bean.getValue().equals(temp.getValue()))
+                       equal = false;
 
-               }
-               return beans;
+               return equal;
        }
 
-       public synchronized boolean update(SequencePropertyBean bean) { 
-               
-               if (table.get(getId(bean))==null)
-                       return false;
-
-               return table.put(getId(bean), bean) != null;
-
+       public boolean update(SequencePropertyBean bean) {      
+               return super.update(getId(bean), bean);
        }
        
-       public synchronized boolean updateOrInsert(SequencePropertyBean bean) { 
-               
-               if (table.get(getId(bean))==null)
-                       table.put(getId(bean), bean);
-
-               return table.put(getId(bean), bean) != null;
-
+       public boolean updateOrInsert(SequencePropertyBean bean) {      
+               throw new UnsupportedOperationException("Deprecated method");
        }
 
        private String getId(SequencePropertyBean bean) {
                return bean.getSequencePropertyKey() + ":" + bean.getName();
        }
+       private String getId(String sequenceId, String name) {
+               return sequenceId + ":" + name;
+       }
        
-       public synchronized SequencePropertyBean 
findUnique(SequencePropertyBean bean) throws SandeshaException {
-               Collection coll = find(bean);
-               if (coll.size()>1) {
-                       String message = SandeshaMessageHelper.getMessage(
-                                       SandeshaMessageKeys.nonUniqueResult);
-                       log.error(message);
-                       throw new SandeshaException (message);
-               }
-               
-               Iterator iter = coll.iterator();
-               if (iter.hasNext())
-                       return (SequencePropertyBean) iter.next();
-               else 
-                       return null;
-       }
-
-       public synchronized Collection retrieveAll() {
-               Collection coll = new ArrayList();
-               
-               Iterator keys = table.keySet().iterator();
-               while (keys.hasNext()) {
-                       Object key = keys.next();
-                       coll.add(table.get(key));
-               }
-               
-               return coll;
+       public SequencePropertyBean findUnique(SequencePropertyBean bean) 
throws SandeshaException {
+               return (SequencePropertyBean) super.findUnique(bean);
        }
 
+       public Collection retrieveAll() {
+               return super.find(null);
+       }
        
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 Thu Nov 23 02:16:56 2006
@@ -17,12 +17,15 @@
 
 package org.apache.sandesha2.storage.inmemory;
 
+import java.util.Collection;
 import java.util.HashMap;
 
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.AxisModule;
 import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
@@ -33,10 +36,13 @@
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.RMBean;
 import org.apache.sandesha2.util.SandeshaUtil;
 
 public class InMemoryStorageManager extends StorageManager {
 
+       private static Log log = 
LogFactory.getLog(InMemoryStorageManager.class);
+
        private static InMemoryStorageManager instance = null;
     private final String MESSAGE_MAP_KEY = "Sandesha2MessageMap";
     private CreateSeqBeanMgr  createSeqBeanMgr = null;
@@ -44,21 +50,44 @@
     private SequencePropertyBeanMgr sequencePropertyBeanMgr = null;
     private SenderBeanMgr senderBeanMgr = null;
     private InvokerBeanMgr invokerBeanMgr = null;
+    private HashMap transactions = new HashMap();
     
        public InMemoryStorageManager(ConfigurationContext context) {
                super(context);
                
-               this.createSeqBeanMgr = new InMemoryCreateSeqBeanMgr (context);
-               this.nextMsgBeanMgr = new InMemoryNextMsgBeanMgr (context);
-               this.senderBeanMgr = new InMemorySenderBeanMgr (context);
-               this.invokerBeanMgr = new InMemoryInvokerBeanMgr (context);
-               this.sequencePropertyBeanMgr = new 
InMemorySequencePropertyBeanMgr (context);
+               this.createSeqBeanMgr = new InMemoryCreateSeqBeanMgr (this, 
context);
+               this.nextMsgBeanMgr = new InMemoryNextMsgBeanMgr (this, 
context);
+               this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
+               this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, 
context);
+               this.sequencePropertyBeanMgr = new 
InMemorySequencePropertyBeanMgr (this, context);
        }
 
        public Transaction getTransaction() {
-               return new InMemoryTransaction();
+               Transaction result = null;
+               synchronized (transactions) {
+                       Long key = new Long(Thread.currentThread().getId());
+                       String name = Thread.currentThread().getName();
+                       result = (Transaction) transactions.get(key);
+                       if(result == null) {
+                               result = new InMemoryTransaction(this, key, 
name);
+                               transactions.put(key, result);
+                       }
+               }
+               return result;
        }
-
+       
+       void removeTransaction(Transaction t) {
+               synchronized (transactions) {
+                       Collection entries = transactions.values();
+                       entries.remove(t);
+               }
+       }
+       
+       void enlistBean(RMBean bean) {
+               InMemoryTransaction t = (InMemoryTransaction) getTransaction();
+               t.enlist(bean);
+       }
+       
        public CreateSeqBeanMgr getCreateSeqBeanMgr() {
                return createSeqBeanMgr;
        }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 Thu Nov 23 02:16:56 2006
@@ -17,25 +17,107 @@
 
 package org.apache.sandesha2.storage.inmemory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beans.RMBean;
 
 /**
- * This class does nothing special. Just to be consistent with the transaction
- * based behavious of other 'Persistant' StorageManager implementations.
+ * This class does not really implement transactions, but it is a good
+ * place to implement locking for the in memory storage manager.
  */
 
 public class InMemoryTransaction implements Transaction {
 
-       public void commit() {
+       private static final Log log = 
LogFactory.getLog(InMemoryTransaction.class);
 
+       private InMemoryStorageManager manager;
+       private Long key;
+       private String threadName;
+       private ArrayList enlistedBeans = new ArrayList();
+       
+       InMemoryTransaction(InMemoryStorageManager manager, Long key, String 
threadName) {
+               if(log.isDebugEnabled()) log.debug("Entry: 
InMemoryTransaction::<init>");
+               this.manager = manager;
+               this.key = key;
+               this.threadName = threadName;
+               if(log.isDebugEnabled()) log.debug("Exit: 
InMemoryTransaction::<init>, " + this);
+       }
+       
+       public void commit() {
+               releaseLocks();
        }
 
        public void rollback() {
-
+               releaseLocks();
        }
        
        public boolean isActive () {
-               return false;
+               return !enlistedBeans.isEmpty();
        }
 
-}
\ No newline at end of file
+       public void enlist(RMBean bean) {
+               if(log.isDebugEnabled()) log.debug("Entry: 
InMemoryTransaction::enlist, " + bean);
+               if(bean != null) {
+                       synchronized (bean) {
+                               Transaction other = bean.getTransaction();
+                               while(other != null && other != this) {
+
+                                       if(!enlistedBeans.isEmpty()) {
+                                               Exception e = new 
Exception("Possible deadlock");
+                                               if(log.isDebugEnabled()) {
+                                                       log.debug("Possible 
deadlock", e);
+                                                       log.debug(this + ", " + 
bean);
+                                               }
+                                       }
+
+                                       try {
+                                               if(log.isDebugEnabled()) 
log.debug("This " + this + " waiting for " + other);
+                                               bean.wait();
+                                       } catch(InterruptedException e) {
+                                               // Do nothing
+                                       }
+                                       other = bean.getTransaction();
+                               }
+                               if(other == null) {
+                                       if(log.isDebugEnabled()) log.debug(this 
+ " locking bean");
+                                       bean.setTransaction(this);
+                                       enlistedBeans.add(bean);
+                               }
+                       }
+               }
+               if(log.isDebugEnabled()) log.debug("Exit: 
InMemoryTransaction::enlist");
+       }
+       
+       private void releaseLocks() {
+               if(log.isDebugEnabled()) log.debug("Entry: 
InMemoryTransaction::releaseLocks, " + this);
+               manager.removeTransaction(this);
+
+               Iterator beans = enlistedBeans.iterator();
+               while(beans.hasNext()) {
+                       RMBean bean = (RMBean) beans.next();
+                       synchronized (bean) {
+                               bean.setTransaction(null);
+                               bean.notify();
+                       }
+               }
+               enlistedBeans.clear();
+               
+               if(log.isDebugEnabled()) log.debug("Exit: 
InMemoryTransaction::releaseLocks");
+       }
+       
+       public String toString() {
+               StringBuffer result = new StringBuffer();
+               result.append("[InMemoryTransaction #");
+               result.append(key);
+               result.append(", name: ");
+               result.append(threadName);
+               result.append(", locks: ");
+               result.append(enlistedBeans.size());
+               result.append("]");
+               return result.toString();
+       }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to