ozeigermann 2003/11/19 04:01:46
Modified: proposals/stores/org/apache/slide/store/impl/rdbms
AbstractRDBMSStore.java
Log:
Fixed deadlock problem with isolation level higher than read committed.
Was due to global connection not freeing locks.
Removed global connection and have a connection for every request outside a
transaction.
Revision Changes Path
1.2 +155 -100
jakarta-slide/proposals/stores/org/apache/slide/store/impl/rdbms/AbstractRDBMSStore.java
Index: AbstractRDBMSStore.java
===================================================================
RCS file:
/home/cvs/jakarta-slide/proposals/stores/org/apache/slide/store/impl/rdbms/AbstractRDBMSStore.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- AbstractRDBMSStore.java 14 Nov 2003 12:28:27 -0000 1.1
+++ AbstractRDBMSStore.java 19 Nov 2003 12:01:46 -0000 1.2
@@ -111,7 +111,7 @@
* @author <a href="mailto:[EMAIL PROTECTED]">Michael Smith</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Ashok Kumar</a>
* @author <a href="mailto:[EMAIL PROTECTED]">Christopher Lenz</a>
- * @takingAllTheBlame <a href="mailto:[EMAIL PROTECTED]">Oliver Zeigermann</a>
+ * @author <a href="mailto:[EMAIL PROTECTED]">Oliver Zeigermann</a>
* @version $Revision$
*/
public abstract class AbstractRDBMSStore
@@ -121,40 +121,18 @@
// FIXED needs a log channel of its own
protected String LOG_CHANNEL = this.getClass().getName();
- // -------------------------------------------------------------- Constants
-
protected static final int TX_IDLE = 0;
protected static final int TX_PREPARED = 1;
- protected static final int TX_SUSPENDED = 1;
-
- /**
- * Size limit for the caches.
- */
- protected static final int CACHE_SIZE = 200;
-
- // ----------------------------------------------------- Instance Variables
-
- /**
- * A map of the active connections, keyed by thread.
- */
- protected Hashtable connectionMap = new Hashtable();
+ protected static final int TX_SUSPENDED = 2;
- /**
- * A connection used when we aren't associated with a transaction correctly.
- */
- protected static Connection globalConnection;
+ protected ThreadLocal activeTransactionContext = new ThreadLocal();
- /**
- *
- */
protected RDBMSAdapter adapter;
protected boolean alreadyInitialized = false;
- // -------------------------------------------------------- Service Methods
-
/**
* Initializes the data source with a set of parameters.
*
@@ -204,29 +182,13 @@
* @exception ServiceConnectionFailedException if the connection failed
*/
public synchronized void connect() throws ServiceConnectionFailedException {
-
- getLogger().log("Trying connect to data source", LOG_CHANNEL, Logger.DEBUG);
- try {
- globalConnection = getNewConnection();
- } catch (SQLException e) {
- throw new ServiceConnectionFailedException(this, "Couldn't get global
connection");
- }
- getLogger().log(
- "Done connecting to database. The global connection is " +
globalConnection,
- LOG_CHANNEL,
- Logger.DEBUG);
}
/**
* Returns connection status.
*/
public boolean isConnected() {
-
- try {
- return (globalConnection != null && !globalConnection.isClosed());
- } catch (SQLException e) {
- return false;
- }
+ return true;
}
/**
@@ -236,12 +198,6 @@
* failed
*/
public void disconnect() throws ServiceDisconnectionFailedException {
-
- try {
- globalConnection.close();
- } catch (SQLException e) {
- getLogger().log("Failed to close special global connection: " +
e.getMessage(), LOG_CHANNEL, Logger.ERROR);
- }
}
/**
@@ -274,7 +230,7 @@
public Xid[] recover(int flag) throws XAException {
getLogger().log("recover() for thread: " + Thread.currentThread(),
LOG_CHANNEL, Logger.DEBUG);
- TransactionId id = (TransactionId)
connectionMap.get(Thread.currentThread());
+ TransactionId id = getActiveTransactionContext();
if (id != null && id.status == TX_PREPARED) {
Xid[] xids = new Xid[1];
@@ -287,7 +243,7 @@
public int prepare(Xid xid) throws XAException {
getLogger().log("prepare() for thread: " + Thread.currentThread(),
LOG_CHANNEL, Logger.DEBUG);
- TransactionId id = (TransactionId)
connectionMap.get(Thread.currentThread());
+ TransactionId id = getActiveTransactionContext();
if (id == null)
throw new XAException(XAException.XAER_NOTA);
@@ -315,7 +271,7 @@
public void forget(Xid xid) throws XAException {
getLogger().log("forget() for thread: " + Thread.currentThread(),
LOG_CHANNEL, Logger.DEBUG);
- TransactionId id = (TransactionId)
connectionMap.get(Thread.currentThread());
+ TransactionId id = getActiveTransactionContext();
if (id == null || id.xid == null)
throw new XAException(XAException.XAER_NOTA);
@@ -329,13 +285,13 @@
getLogger().log("Couldn't close connection.", LOG_CHANNEL,
Logger.ERROR);
}
getLogger().log("forget(): removing from map: " + Thread.currentThread(),
LOG_CHANNEL, Logger.DEBUG);
- connectionMap.remove(Thread.currentThread());
+ activeTransactionContext.set(null);
}
public void end(Xid xid, int flags) throws XAException {
getLogger().log("end() for thread: " + Thread.currentThread(), LOG_CHANNEL,
Logger.DEBUG);
- TransactionId id = (TransactionId)
connectionMap.get(Thread.currentThread());
+ TransactionId id = getActiveTransactionContext();
if (id == null || id.xid == null)
throw new XAException(XAException.XAER_NOTA);
if (xid == null)
@@ -359,7 +315,7 @@
LOG_CHANNEL,
Logger.DEBUG);
- TransactionId id = (TransactionId)
connectionMap.remove(Thread.currentThread());
+ TransactionId id = getActiveTransactionContext();
if (id == null) {
getLogger().log(
"Error committing: no transaction associated with current thread",
@@ -385,12 +341,15 @@
throw new XAException(XAException.XAER_NOTA);
}
- // TAKEN FROM ASHOK START: this stuff should be committed
try {
- if (id.rollbackOnly)
+ if (id.rollbackOnly) {
+
conn.rollback();
- else
+ } else {
conn.commit();
+ }
+ activeTransactionContext.set(null);
+
} catch (Exception e) {
throw new XAException(XAException.XA_RBCOMMFAIL);
} finally {
@@ -402,7 +361,6 @@
getLogger().log(e, LOG_CHANNEL, Logger.ERROR);
}
}
- // TAKEN FROM ASHOK END
}
/**
@@ -416,7 +374,7 @@
LOG_CHANNEL,
Logger.DEBUG);
- TransactionId id = (TransactionId)
connectionMap.remove(Thread.currentThread());
+ TransactionId id = getActiveTransactionContext();
if (id == null) {
getLogger().log("No transaction associated with current thread, can't
rollback", LOG_CHANNEL, Logger.ERROR);
throw new XAException(XAException.XAER_NOTA);
@@ -430,9 +388,10 @@
Logger.ERROR);
throw new XAException(XAException.XAER_NOTA);
}
- // TAKEN FROM ASHOK START: this stuff should be rolled back
+
try {
conn.rollback();
+ activeTransactionContext.set(null);
} catch (SQLException e) {
throw new XAException(XAException.XA_HEURCOM);
} finally {
@@ -444,7 +403,6 @@
getLogger().log(e, LOG_CHANNEL, Logger.ERROR);
}
}
- // TAKEN FROM ASHOK END
}
/**
@@ -453,17 +411,21 @@
public void start(Xid xid, int flags) throws XAException {
getLogger().log("start(): beginning transaction with xid " + xid,
LOG_CHANNEL, Logger.DEBUG);
- TransactionId id = (TransactionId)
connectionMap.get(Thread.currentThread());
+ TransactionId id = getActiveTransactionContext();
switch (flags) {
case XAResource.TMNOFLAGS :
if (id != null)
throw new XAException(XAException.XAER_INVAL);
- id = new TransactionId(xid, TX_IDLE);
+ try {
+ id = new TransactionId(xid, TX_IDLE);
+ } catch (SQLException e) {
+ throw new XAException(XAException.XAER_RMFAIL); // XXX or is it
an error?
+ }
getLogger().log("start(): adding to map for " +
Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG);
- connectionMap.put(Thread.currentThread(), id);
+ activeTransactionContext.set(id);
break;
case XAResource.TMJOIN :
getLogger().log(
@@ -498,7 +460,25 @@
*/
public ObjectNode retrieveObject(Uri uri) throws ServiceAccessException,
ObjectNotFoundException {
- return adapter.retrieveObject(getCurrentConnection(), uri);
+ if (getActiveTransactionContext() == null) {
+ Connection connection = null;
+ try {
+ connection = getNewConnection();
+ return adapter.retrieveObject(connection, uri);
+ } catch (SQLException e) {
+ throw new ServiceAccessException(this, e);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
+ }
+ }
+ }
+ } else {
+ return adapter.retrieveObject(getCurrentConnection(), uri);
+ }
}
/**
@@ -581,8 +561,25 @@
* @exception ServiceAccessException Error accessing the Service
*/
public Enumeration enumeratePermissions(Uri uri) throws ServiceAccessException {
-
- return adapter.enumeratePermissions(getCurrentConnection(), uri);
+ if (getActiveTransactionContext() == null) {
+ Connection connection = null;
+ try {
+ connection = getNewConnection();
+ return adapter.enumeratePermissions(connection, uri);
+ } catch (SQLException e) {
+ throw new ServiceAccessException(this, e);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
+ }
+ }
+ }
+ } else {
+ return adapter.enumeratePermissions(getCurrentConnection(), uri);
+ }
}
// ----------------------------------------------- LockStore Implementation
@@ -642,8 +639,25 @@
* @exception ServiceAccessException Service access error
*/
public Enumeration enumerateLocks(Uri uri) throws ServiceAccessException {
-
- return adapter.enumerateLocks(getCurrentConnection(), uri);
+ if (getActiveTransactionContext() == null) {
+ Connection connection = null;
+ try {
+ connection = getNewConnection();
+ return adapter.enumerateLocks(connection, uri);
+ } catch (SQLException e) {
+ throw new ServiceAccessException(this, e);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
+ }
+ }
+ }
+ } else {
+ return adapter.enumerateLocks(getCurrentConnection(), uri);
+ }
}
// -------------------------------- RevisionDescriptorsStore Implementation
@@ -658,8 +672,25 @@
*/
public NodeRevisionDescriptors retrieveRevisionDescriptors(Uri uri)
throws ServiceAccessException, RevisionDescriptorNotFoundException {
-
- return adapter.retrieveRevisionDescriptors(getCurrentConnection(), uri);
+ if (getActiveTransactionContext() == null) {
+ Connection connection = null;
+ try {
+ connection = getNewConnection();
+ return adapter.retrieveRevisionDescriptors(connection, uri);
+ } catch (SQLException e) {
+ throw new ServiceAccessException(this, e);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
+ }
+ }
+ }
+ } else {
+ return adapter.retrieveRevisionDescriptors(getCurrentConnection(), uri);
+ }
}
/**
@@ -711,8 +742,25 @@
*/
public NodeRevisionDescriptor retrieveRevisionDescriptor(Uri uri,
NodeRevisionNumber revisionNumber)
throws ServiceAccessException, RevisionDescriptorNotFoundException {
-
- return adapter.retrieveRevisionDescriptor(getCurrentConnection(), uri,
revisionNumber);
+ if (getActiveTransactionContext() == null) {
+ Connection connection = null;
+ try {
+ connection = getNewConnection();
+ return adapter.retrieveRevisionDescriptor(connection, uri,
revisionNumber);
+ } catch (SQLException e) {
+ throw new ServiceAccessException(this, e);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
+ }
+ }
+ }
+ } else {
+ return adapter.retrieveRevisionDescriptor(getCurrentConnection(), uri,
revisionNumber);
+ }
}
/**
@@ -765,8 +813,26 @@
*/
public NodeRevisionContent retrieveRevisionContent(Uri uri,
NodeRevisionDescriptor revisionDescriptor)
throws ServiceAccessException, RevisionNotFoundException {
-
- return adapter.retrieveRevisionContent(getCurrentConnection(), uri,
revisionDescriptor);
+ if (getActiveTransactionContext() == null) {
+ Connection connection = null;
+ try {
+ connection = getNewConnection();
+ return adapter.retrieveRevisionContent(connection, uri,
revisionDescriptor);
+ } catch (SQLException e) {
+ throw new ServiceAccessException(this, e);
+ } finally {
+ if (connection != null) {
+ try {
+ // FIXME this really should not work, as we might have a
stream not yet closed when compression if turned off
+ connection.close();
+ } catch (SQLException e) {
+ getLogger().log(e, LOG_CHANNEL, Logger.WARNING);
+ }
+ }
+ }
+ } else {
+ return adapter.retrieveRevisionContent(getCurrentConnection(), uri,
revisionDescriptor);
+ }
}
/**
@@ -818,23 +884,20 @@
/**
* Get the Connection object associated with the current transaction.
*/
- protected Connection getCurrentConnection() {
+ protected Connection getCurrentConnection() throws ServiceAccessException {
getLogger().log("Getting current connection for thread " +
Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG);
- TransactionId id = (TransactionId)
connectionMap.get(Thread.currentThread());
+ TransactionId id = getActiveTransactionContext();
if (id == null) {
getLogger().log("No id for current thread - called outside
transaction?", LOG_CHANNEL, Logger.DEBUG);
- return globalConnection;
- }
-
- Connection conn = id.connection;
- if (conn == null) {
- getLogger().log("No connection for current id - shouldn't be possible",
LOG_CHANNEL, Logger.ERROR);
- return globalConnection;
+ return null;
}
+ return id.connection;
+ }
- getLogger().log("Returning current valid connection from map", LOG_CHANNEL,
Logger.DEBUG);
- return conn;
+ protected TransactionId getActiveTransactionContext() {
+ Object txId = activeTransactionContext.get();
+ return (TransactionId) txId;
}
abstract protected Connection getNewConnection() throws SQLException;
@@ -847,20 +910,12 @@
boolean rollbackOnly;
Connection connection;
- TransactionId(Xid xid, int status) {
+ TransactionId(Xid xid, int status) throws SQLException {
this.xid = xid;
this.status = status;
this.rollbackOnly = false;
- try {
- connection = getNewConnection();
- if (connection == null) {
- connection = globalConnection;
- return;
- }
- } catch (SQLException e) {
- connection = globalConnection;
- }
+ connection = getNewConnection();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]