ozeigermann    2003/11/19 04:01:46

  Modified:    proposals/stores/org/apache/slide/store/impl/rdbms
                        AbstractRDBMSStore.java
  Log:
  Fixed deadlock problem with isolation level higher than read committed. 
  Was due to global connection not freeing locks.
  Removed global connection and have a connection for every request outside a 
transaction.
  
  Revision  Changes    Path
  1.2       +155 -100  
jakarta-slide/proposals/stores/org/apache/slide/store/impl/rdbms/AbstractRDBMSStore.java
  
  Index: AbstractRDBMSStore.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-slide/proposals/stores/org/apache/slide/store/impl/rdbms/AbstractRDBMSStore.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- AbstractRDBMSStore.java   14 Nov 2003 12:28:27 -0000      1.1
  +++ AbstractRDBMSStore.java   19 Nov 2003 12:01:46 -0000      1.2
  @@ -111,7 +111,7 @@
    * @author <a href="mailto:[EMAIL PROTECTED]">Michael Smith</a>
    * @author <a href="mailto:[EMAIL PROTECTED]">Ashok Kumar</a>
    * @author <a href="mailto:[EMAIL PROTECTED]">Christopher Lenz</a>
  - * @takingAllTheBlame <a href="mailto:[EMAIL PROTECTED]">Oliver Zeigermann</a> 
  + * @author <a href="mailto:[EMAIL PROTECTED]">Oliver Zeigermann</a> 
    * @version $Revision$
    */
   public abstract class AbstractRDBMSStore
  @@ -121,40 +121,18 @@
       // FIXED needs a log channel of its own
       protected String LOG_CHANNEL = this.getClass().getName();
   
  -    // -------------------------------------------------------------- Constants
  -
       protected static final int TX_IDLE = 0;
   
       protected static final int TX_PREPARED = 1;
   
  -    protected static final int TX_SUSPENDED = 1;
  -
  -    /**
  -     * Size limit for the caches.
  -     */
  -    protected static final int CACHE_SIZE = 200;
  -
  -    // ----------------------------------------------------- Instance Variables
  -
  -    /**
  -     * A map of the active connections, keyed by thread.
  -     */
  -    protected Hashtable connectionMap = new Hashtable();
  +    protected static final int TX_SUSPENDED = 2;
   
  -    /**
  -     * A connection used when we aren't associated with a transaction correctly.
  -     */
  -    protected static Connection globalConnection;
  +    protected ThreadLocal activeTransactionContext = new ThreadLocal();
   
  -    /**
  -     * 
  -     */
       protected RDBMSAdapter adapter;
   
       protected boolean alreadyInitialized = false;
   
  -    // -------------------------------------------------------- Service Methods
  -
       /**
        * Initializes the data source with a set of parameters.
        *
  @@ -204,29 +182,13 @@
        * @exception ServiceConnectionFailedException if the connection failed
        */
       public synchronized void connect() throws ServiceConnectionFailedException {
  -
  -        getLogger().log("Trying connect to data source", LOG_CHANNEL, Logger.DEBUG);
  -        try {
  -            globalConnection = getNewConnection();
  -        } catch (SQLException e) {
  -            throw new ServiceConnectionFailedException(this, "Couldn't get global 
connection");
  -        }
  -        getLogger().log(
  -            "Done connecting to database. The global connection is " + 
globalConnection,
  -            LOG_CHANNEL,
  -            Logger.DEBUG);
       }
   
       /**
        * Returns connection status.
        */
       public boolean isConnected() {
  -
  -        try {
  -            return (globalConnection != null && !globalConnection.isClosed());
  -        } catch (SQLException e) {
  -            return false;
  -        }
  +        return true;
       }
   
       /**
  @@ -236,12 +198,6 @@
        *            failed
        */
       public void disconnect() throws ServiceDisconnectionFailedException {
  -
  -        try {
  -            globalConnection.close();
  -        } catch (SQLException e) {
  -            getLogger().log("Failed to close special global connection: " + 
e.getMessage(), LOG_CHANNEL, Logger.ERROR);
  -        }
       }
   
       /**
  @@ -274,7 +230,7 @@
       public Xid[] recover(int flag) throws XAException {
   
           getLogger().log("recover() for thread: " + Thread.currentThread(), 
LOG_CHANNEL, Logger.DEBUG);
  -        TransactionId id = (TransactionId) 
connectionMap.get(Thread.currentThread());
  +        TransactionId id = getActiveTransactionContext();
   
           if (id != null && id.status == TX_PREPARED) {
               Xid[] xids = new Xid[1];
  @@ -287,7 +243,7 @@
       public int prepare(Xid xid) throws XAException {
   
           getLogger().log("prepare() for thread: " + Thread.currentThread(), 
LOG_CHANNEL, Logger.DEBUG);
  -        TransactionId id = (TransactionId) 
connectionMap.get(Thread.currentThread());
  +        TransactionId id = getActiveTransactionContext();
   
           if (id == null)
               throw new XAException(XAException.XAER_NOTA);
  @@ -315,7 +271,7 @@
       public void forget(Xid xid) throws XAException {
   
           getLogger().log("forget() for thread: " + Thread.currentThread(), 
LOG_CHANNEL, Logger.DEBUG);
  -        TransactionId id = (TransactionId) 
connectionMap.get(Thread.currentThread());
  +        TransactionId id = getActiveTransactionContext();
   
           if (id == null || id.xid == null)
               throw new XAException(XAException.XAER_NOTA);
  @@ -329,13 +285,13 @@
               getLogger().log("Couldn't close connection.", LOG_CHANNEL, 
Logger.ERROR);
           }
           getLogger().log("forget(): removing from map: " + Thread.currentThread(), 
LOG_CHANNEL, Logger.DEBUG);
  -        connectionMap.remove(Thread.currentThread());
  +        activeTransactionContext.set(null);
       }
   
       public void end(Xid xid, int flags) throws XAException {
   
           getLogger().log("end() for thread: " + Thread.currentThread(), LOG_CHANNEL, 
Logger.DEBUG);
  -        TransactionId id = (TransactionId) 
connectionMap.get(Thread.currentThread());
  +        TransactionId id = getActiveTransactionContext();
           if (id == null || id.xid == null)
               throw new XAException(XAException.XAER_NOTA);
           if (xid == null)
  @@ -359,7 +315,7 @@
               LOG_CHANNEL,
               Logger.DEBUG);
   
  -        TransactionId id = (TransactionId) 
connectionMap.remove(Thread.currentThread());
  +        TransactionId id = getActiveTransactionContext();
           if (id == null) {
               getLogger().log(
                   "Error committing: no transaction associated with current thread",
  @@ -385,12 +341,15 @@
               throw new XAException(XAException.XAER_NOTA);
           }
   
  -        // TAKEN FROM ASHOK START: this stuff should be committed
           try {
  -            if (id.rollbackOnly)
  +            if (id.rollbackOnly) {
  +
                   conn.rollback();
  -            else
  +            } else {
                   conn.commit();
  +            }
  +            activeTransactionContext.set(null);
  +
           } catch (Exception e) {
               throw new XAException(XAException.XA_RBCOMMFAIL);
           } finally {
  @@ -402,7 +361,6 @@
                   getLogger().log(e, LOG_CHANNEL, Logger.ERROR);
               }
           }
  -        // TAKEN FROM ASHOK END
       }
   
       /**
  @@ -416,7 +374,7 @@
               LOG_CHANNEL,
               Logger.DEBUG);
   
  -        TransactionId id = (TransactionId) 
connectionMap.remove(Thread.currentThread());
  +        TransactionId id = getActiveTransactionContext();
           if (id == null) {
               getLogger().log("No transaction associated with current thread, can't 
rollback", LOG_CHANNEL, Logger.ERROR);
               throw new XAException(XAException.XAER_NOTA);
  @@ -430,9 +388,10 @@
                   Logger.ERROR);
               throw new XAException(XAException.XAER_NOTA);
           }
  -        // TAKEN FROM ASHOK START: this stuff should be rolled back
  +
           try {
               conn.rollback();
  +            activeTransactionContext.set(null);
           } catch (SQLException e) {
               throw new XAException(XAException.XA_HEURCOM);
           } finally {
  @@ -444,7 +403,6 @@
                   getLogger().log(e, LOG_CHANNEL, Logger.ERROR);
               }
           }
  -        // TAKEN FROM ASHOK END
       }
   
       /**
  @@ -453,17 +411,21 @@
       public void start(Xid xid, int flags) throws XAException {
           getLogger().log("start(): beginning transaction with xid " + xid, 
LOG_CHANNEL, Logger.DEBUG);
   
  -        TransactionId id = (TransactionId) 
connectionMap.get(Thread.currentThread());
  +        TransactionId id = getActiveTransactionContext();
   
           switch (flags) {
               case XAResource.TMNOFLAGS :
                   if (id != null)
                       throw new XAException(XAException.XAER_INVAL);
  -                id = new TransactionId(xid, TX_IDLE);
  +                try {
  +                    id = new TransactionId(xid, TX_IDLE);
  +                } catch (SQLException e) {
  +                    throw new XAException(XAException.XAER_RMFAIL); // XXX or is it 
an error?
  +                }
   
                   getLogger().log("start(): adding to map for " + 
Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG);
   
  -                connectionMap.put(Thread.currentThread(), id);
  +                activeTransactionContext.set(id);
                   break;
               case XAResource.TMJOIN :
                   getLogger().log(
  @@ -498,7 +460,25 @@
        */
       public ObjectNode retrieveObject(Uri uri) throws ServiceAccessException, 
ObjectNotFoundException {
   
  -        return adapter.retrieveObject(getCurrentConnection(), uri);
  +        if (getActiveTransactionContext() == null) {
  +            Connection connection = null;
  +            try {
  +                connection = getNewConnection();
  +                return adapter.retrieveObject(connection, uri);
  +            } catch (SQLException e) {
  +                throw new ServiceAccessException(this, e);
  +            } finally {
  +                if (connection != null) {
  +                    try {
  +                        connection.close();
  +                    } catch (SQLException e) {
  +                        getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
  +                    }
  +                }
  +            }
  +        } else {
  +            return adapter.retrieveObject(getCurrentConnection(), uri);
  +        }
       }
   
       /**
  @@ -581,8 +561,25 @@
        * @exception ServiceAccessException Error accessing the Service
        */
       public Enumeration enumeratePermissions(Uri uri) throws ServiceAccessException {
  -
  -        return adapter.enumeratePermissions(getCurrentConnection(), uri);
  +        if (getActiveTransactionContext() == null) {
  +            Connection connection = null;
  +            try {
  +                connection = getNewConnection();
  +                return adapter.enumeratePermissions(connection, uri);
  +            } catch (SQLException e) {
  +                throw new ServiceAccessException(this, e);
  +            } finally {
  +                if (connection != null) {
  +                    try {
  +                        connection.close();
  +                    } catch (SQLException e) {
  +                        getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
  +                    }
  +                }
  +            }
  +        } else {
  +            return adapter.enumeratePermissions(getCurrentConnection(), uri);
  +        }
       }
   
       // ----------------------------------------------- LockStore Implementation
  @@ -642,8 +639,25 @@
        * @exception ServiceAccessException Service access error
        */
       public Enumeration enumerateLocks(Uri uri) throws ServiceAccessException {
  -
  -        return adapter.enumerateLocks(getCurrentConnection(), uri);
  +        if (getActiveTransactionContext() == null) {
  +            Connection connection = null;
  +            try {
  +                connection = getNewConnection();
  +                return adapter.enumerateLocks(connection, uri);
  +            } catch (SQLException e) {
  +                throw new ServiceAccessException(this, e);
  +            } finally {
  +                if (connection != null) {
  +                    try {
  +                        connection.close();
  +                    } catch (SQLException e) {
  +                        getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
  +                    }
  +                }
  +            }
  +        } else {
  +            return adapter.enumerateLocks(getCurrentConnection(), uri);
  +        }
       }
   
       // -------------------------------- RevisionDescriptorsStore Implementation
  @@ -658,8 +672,25 @@
        */
       public NodeRevisionDescriptors retrieveRevisionDescriptors(Uri uri)
           throws ServiceAccessException, RevisionDescriptorNotFoundException {
  -
  -        return adapter.retrieveRevisionDescriptors(getCurrentConnection(), uri);
  +        if (getActiveTransactionContext() == null) {
  +            Connection connection = null;
  +            try {
  +                connection = getNewConnection();
  +                return adapter.retrieveRevisionDescriptors(connection, uri);
  +            } catch (SQLException e) {
  +                throw new ServiceAccessException(this, e);
  +            } finally {
  +                if (connection != null) {
  +                    try {
  +                        connection.close();
  +                    } catch (SQLException e) {
  +                        getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
  +                    }
  +                }
  +            }
  +        } else {
  +            return adapter.retrieveRevisionDescriptors(getCurrentConnection(), uri);
  +        }
       }
   
       /**
  @@ -711,8 +742,25 @@
        */
       public NodeRevisionDescriptor retrieveRevisionDescriptor(Uri uri, 
NodeRevisionNumber revisionNumber)
           throws ServiceAccessException, RevisionDescriptorNotFoundException {
  -
  -        return adapter.retrieveRevisionDescriptor(getCurrentConnection(), uri, 
revisionNumber);
  +        if (getActiveTransactionContext() == null) {
  +            Connection connection = null;
  +            try {
  +                connection = getNewConnection();
  +                return adapter.retrieveRevisionDescriptor(connection, uri, 
revisionNumber);
  +            } catch (SQLException e) {
  +                throw new ServiceAccessException(this, e);
  +            } finally {
  +                if (connection != null) {
  +                    try {
  +                        connection.close();
  +                    } catch (SQLException e) {
  +                        getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
  +                    }
  +                }
  +            }
  +        } else {
  +            return adapter.retrieveRevisionDescriptor(getCurrentConnection(), uri, 
revisionNumber);
  +        }
       }
   
       /**
  @@ -765,8 +813,26 @@
        */
       public NodeRevisionContent retrieveRevisionContent(Uri uri, 
NodeRevisionDescriptor revisionDescriptor)
           throws ServiceAccessException, RevisionNotFoundException {
  -
  -        return adapter.retrieveRevisionContent(getCurrentConnection(), uri, 
revisionDescriptor);
  +        if (getActiveTransactionContext() == null) {
  +            Connection connection = null;
  +            try {
  +                connection = getNewConnection();
  +                return adapter.retrieveRevisionContent(connection, uri, 
revisionDescriptor);
  +            } catch (SQLException e) {
  +                throw new ServiceAccessException(this, e);
  +            } finally {
  +                if (connection != null) {
  +                    try {
  +                        // FIXME this really should not work, as we might have a 
stream not yet closed when compression if turned off
  +                        connection.close();
  +                    } catch (SQLException e) {
  +                        getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
  +                    }
  +                }
  +            }
  +        } else {
  +            return adapter.retrieveRevisionContent(getCurrentConnection(), uri, 
revisionDescriptor);
  +        }
       }
   
       /**
  @@ -818,23 +884,20 @@
       /** 
        * Get the Connection object associated with the current transaction.
        */
  -    protected Connection getCurrentConnection() {
  +    protected Connection getCurrentConnection() throws ServiceAccessException {
   
           getLogger().log("Getting current connection for thread " + 
Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG);
  -        TransactionId id = (TransactionId) 
connectionMap.get(Thread.currentThread());
  +        TransactionId id = getActiveTransactionContext();
           if (id == null) {
               getLogger().log("No id for current thread - called outside 
transaction?", LOG_CHANNEL, Logger.DEBUG);
  -            return globalConnection;
  -        }
  -
  -        Connection conn = id.connection;
  -        if (conn == null) {
  -            getLogger().log("No connection for current id - shouldn't be possible", 
LOG_CHANNEL, Logger.ERROR);
  -            return globalConnection;
  +            return null;
           }
  +        return id.connection;
  +    }
   
  -        getLogger().log("Returning current valid connection from map", LOG_CHANNEL, 
Logger.DEBUG);
  -        return conn;
  +    protected TransactionId getActiveTransactionContext() {
  +        Object txId = activeTransactionContext.get();
  +        return (TransactionId) txId;
       }
   
       abstract protected Connection getNewConnection() throws SQLException;
  @@ -847,20 +910,12 @@
           boolean rollbackOnly;
           Connection connection;
   
  -        TransactionId(Xid xid, int status) {
  +        TransactionId(Xid xid, int status) throws SQLException {
               this.xid = xid;
               this.status = status;
               this.rollbackOnly = false;
   
  -            try {
  -                connection = getNewConnection();
  -                if (connection == null) {
  -                    connection = globalConnection;
  -                    return;
  -                }
  -            } catch (SQLException e) {
  -                connection = globalConnection;
  -            }
  +            connection = getNewConnection();
           }
   
       }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to