Author: reschke
Date: Mon Mar 31 21:07:28 2014
New Revision: 1583465
URL: http://svn.apache.org/r1583465
Log:
OAK-1533 - switch RDBBlobStore to leverage DataSource
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java?rev=1583465&r1=1583464&r2=1583465&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
Mon Mar 31 21:07:28 2014
@@ -47,7 +47,7 @@ public class RDBBlobStore extends Cachin
try {
String jdbcurl = "jdbc:h2:mem:oaknodes";
DataSource ds = RDBDataSourceFactory.forJdbcUrl(jdbcurl, "sa", "");
- initialize(ds.getConnection());
+ initialize(ds);
} catch (Exception ex) {
throw new MicroKernelException("initializing RDB blob store", ex);
}
@@ -60,7 +60,7 @@ public class RDBBlobStore extends Cachin
public RDBBlobStore(String jdbcurl, String username, String password) {
try {
DataSource ds = RDBDataSourceFactory.forJdbcUrl(jdbcurl, username,
password);
- initialize(ds.getConnection());
+ initialize(ds);
} catch (Exception ex) {
throw new MicroKernelException("initializing RDB blob store", ex);
}
@@ -72,7 +72,7 @@ public class RDBBlobStore extends Cachin
*/
public RDBBlobStore(DataSource ds) {
try {
- initialize(ds.getConnection());
+ initialize(ds);
} catch (Exception ex) {
throw new MicroKernelException("initializing RDB blob store", ex);
}
@@ -80,17 +80,12 @@ public class RDBBlobStore extends Cachin
@Override
public void close() {
- try {
- this.connection.close();
- this.connection = null;
- } catch (SQLException ex) {
- throw new MicroKernelException(ex);
- }
+ this.ds = null;
}
@Override
public void finalize() {
- if (this.connection != null && this.callStack != null) {
+ if (this.ds != null && this.callStack != null) {
LOG.debug("finalizing RDBDocumentStore that was not disposed",
this.callStack);
}
}
@@ -99,47 +94,55 @@ public class RDBBlobStore extends Cachin
private Exception callStack;
- private Connection connection;
-
- private void initialize(Connection con) throws Exception {
- con.setAutoCommit(false);
+ private DataSource ds;
- for (String tableName : new String[] { "DATASTORE_META",
"DATASTORE_DATA" }) {
- try {
- PreparedStatement stmt = con.prepareStatement("select ID from
" + tableName + " where ID = ?");
- stmt.setString(1, "0");
- stmt.executeQuery();
- } catch (SQLException ex) {
- // table does not appear to exist
- con.rollback();
+ private void initialize(DataSource ds) throws Exception {
- String dbtype = con.getMetaData().getDatabaseProductName();
- LOG.info("Attempting to create table " + tableName + " in " +
dbtype);
+ this.ds = ds;
+ Connection con = ds.getConnection();
- Statement stmt = con.createStatement();
+ try {
+ con.setAutoCommit(false);
- if (tableName.equals("DATASTORE_META")) {
- stmt.execute("create table " + tableName
- + " (ID varchar(1000) not null primary key, LEVEL
int, LASTMOD bigint)");
- } else {
- // the code below likely will need to be extended for new
- // database types
- if ("PostgreSQL".equals(dbtype)) {
- stmt.execute("create table " + tableName + " (ID
varchar(1000) not null primary key, DATA bytea)");
- } else if ("DB2".equals(dbtype) || (dbtype != null &&
dbtype.startsWith("DB2/"))) {
- stmt.execute("create table " + tableName + " (ID
varchar(1000) not null primary key, DATA blob)");
+ for (String tableName : new String[] { "DATASTORE_META",
"DATASTORE_DATA" }) {
+ try {
+ PreparedStatement stmt = con.prepareStatement("select ID
from " + tableName + " where ID = ?");
+ stmt.setString(1, "0");
+ stmt.executeQuery();
+ } catch (SQLException ex) {
+ // table does not appear to exist
+ con.rollback();
+
+ String dbtype = con.getMetaData().getDatabaseProductName();
+ LOG.info("Attempting to create table " + tableName + " in
" + dbtype);
+
+ Statement stmt = con.createStatement();
+
+ if (tableName.equals("DATASTORE_META")) {
+ stmt.execute("create table " + tableName
+ + " (ID varchar(1000) not null primary key,
LEVEL int, LASTMOD bigint)");
} else {
- stmt.execute("create table " + tableName + " (ID
varchar(1000) not null primary key, DATA blob)");
+ // the code below likely will need to be extended for
+ // new
+ // database types
+ if ("PostgreSQL".equals(dbtype)) {
+ stmt.execute("create table " + tableName + " (ID
varchar(1000) not null primary key, DATA bytea)");
+ } else if ("DB2".equals(dbtype) || (dbtype != null &&
dbtype.startsWith("DB2/"))) {
+ stmt.execute("create table " + tableName + " (ID
varchar(1000) not null primary key, DATA blob)");
+ } else {
+ stmt.execute("create table " + tableName + " (ID
varchar(1000) not null primary key, DATA blob)");
+ }
}
- }
- stmt.close();
+ stmt.close();
- con.commit();
+ con.commit();
+ }
}
+ } finally {
+ // con.close();
}
- this.connection = con;
this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of
RDBBlobStore creation") : null;
}
@@ -155,11 +158,14 @@ public class RDBBlobStore extends Cachin
}
private void storeBlockInDatabase(byte[] digest, int level, byte[] data)
throws SQLException {
+
String id = StringUtils.convertBytesToHex(digest);
cache.put(id, data);
+ Connection con = ds.getConnection();
+
try {
long now = System.currentTimeMillis();
- PreparedStatement prep = connection.prepareStatement("update
datastore_meta set lastMod = ? where id = ?");
+ PreparedStatement prep = con.prepareStatement("update
datastore_meta set lastMod = ? where id = ?");
int count;
try {
prep.setLong(1, now);
@@ -170,7 +176,7 @@ public class RDBBlobStore extends Cachin
}
if (count == 0) {
try {
- prep = connection.prepareStatement("insert into
datastore_data(id, data) values(?, ?)");
+ prep = con.prepareStatement("insert into
datastore_data(id, data) values(?, ?)");
try {
prep.setString(1, id);
prep.setBytes(2, data);
@@ -182,7 +188,7 @@ public class RDBBlobStore extends Cachin
// already exists - ok
}
try {
- prep = connection.prepareStatement("insert into
datastore_meta(id, level, lastMod) values(?, ?, ?)");
+ prep = con.prepareStatement("insert into
datastore_meta(id, level, lastMod) values(?, ?, ?)");
try {
prep.setString(1, id);
prep.setInt(2, level);
@@ -196,16 +202,20 @@ public class RDBBlobStore extends Cachin
}
}
} finally {
- connection.commit();
+ con.commit();
+ con.close();
}
}
@Override
protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
+
String id = StringUtils.convertBytesToHex(blockId.getDigest());
byte[] data = cache.get(id);
+ Connection con = ds.getConnection();
+
try {
- PreparedStatement prep = connection.prepareStatement("select data
from datastore_data where id = ?");
+ PreparedStatement prep = con.prepareStatement("select data from
datastore_data where id = ?");
try {
prep.setString(1, id);
ResultSet rs = prep.executeQuery();
@@ -218,7 +228,8 @@ public class RDBBlobStore extends Cachin
}
cache.put(id, data);
} finally {
- connection.commit();
+ con.commit();
+ con.close();
}
// System.out.println(" read block " + id + " blockLen: " +
// data.length + " [0]: " + data[0]);
@@ -247,20 +258,21 @@ public class RDBBlobStore extends Cachin
@Override
protected void mark(BlockId blockId) throws Exception {
+ Connection con = ds.getConnection();
try {
if (minLastModified == 0) {
return;
}
String id = StringUtils.convertBytesToHex(blockId.getDigest());
- PreparedStatement prep = connection
- .prepareStatement("update datastore_meta set lastMod = ?
where id = ? and lastMod < ?");
+ PreparedStatement prep = con.prepareStatement("update
datastore_meta set lastMod = ? where id = ? and lastMod < ?");
prep.setLong(1, System.currentTimeMillis());
prep.setString(2, id);
prep.setLong(3, minLastModified);
prep.executeUpdate();
prep.close();
} finally {
- connection.commit();
+ con.commit();
+ con.close();
}
}
@@ -274,17 +286,18 @@ public class RDBBlobStore extends Cachin
}
private int sweepFromDatabase() throws SQLException {
+ Connection con = ds.getConnection();
try {
int count = 0;
- PreparedStatement prep = connection.prepareStatement("select id
from datastore_meta where lastMod < ?");
+ PreparedStatement prep = con.prepareStatement("select id from
datastore_meta where lastMod < ?");
prep.setLong(1, minLastModified);
ResultSet rs = prep.executeQuery();
ArrayList<String> ids = new ArrayList<String>();
while (rs.next()) {
ids.add(rs.getString(1));
}
- prep = connection.prepareStatement("delete from datastore_meta
where id = ?");
- PreparedStatement prepData = connection.prepareStatement("delete
from datastore_data where id = ?");
+ prep = con.prepareStatement("delete from datastore_meta where id =
?");
+ PreparedStatement prepData = con.prepareStatement("delete from
datastore_data where id = ?");
for (String id : ids) {
prep.setString(1, id);
prep.execute();
@@ -297,12 +310,14 @@ public class RDBBlobStore extends Cachin
minLastModified = 0;
return count;
} finally {
- connection.commit();
+ con.commit();
+ con.close();
}
}
@Override
public boolean deleteChunks(List<String> chunkIds, long
maxLastModifiedTime) throws Exception {
+ Connection con = ds.getConnection();
try {
PreparedStatement prep = null;
PreparedStatement prepData = null;
@@ -317,25 +332,25 @@ public class RDBBlobStore extends Cachin
}
if (maxLastModifiedTime > 0) {
- prep = connection.prepareStatement(
+ prep = con.prepareStatement(
"delete from datastore_meta where id in ("
+ inClause.toString() + ") and lastMod <= ?");
prep.setLong(batch + 1, maxLastModifiedTime);
- prepData = connection.prepareStatement(
+ prepData = con.prepareStatement(
"delete from datastore_data where id in ("
+ inClause.toString() + ") and lastMod <= ?");
prepData.setLong(batch + 1, maxLastModifiedTime);
} else {
- prep = connection.prepareStatement(
+ prep = con.prepareStatement(
"delete from datastore_meta where id in ("
+ inClause.toString() + ")");
- prepData = connection.prepareStatement(
+ prepData = con.prepareStatement(
"delete from datastore_data where id in ("
+ inClause.toString() + ")");
}
-
+
for (int idx = 0; idx < batch; idx++) {
prep.setString(idx + 1, chunkIds.get(idx));
prepData.setString(idx + 1, chunkIds.get(idx));
@@ -346,7 +361,8 @@ public class RDBBlobStore extends Cachin
prep.close();
prepData.close();
} finally {
- connection.commit();
+ con.commit();
+ con.close();
}
return true;
@@ -354,7 +370,7 @@ public class RDBBlobStore extends Cachin
@Override
public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws
Exception {
- return new ChunkIdIterator(this.connection, maxLastModifiedTime);
+ return new ChunkIdIterator(this.ds, maxLastModifiedTime);
}
@@ -364,14 +380,14 @@ public class RDBBlobStore extends Cachin
private static class ChunkIdIterator extends AbstractIterator<String> {
private long maxLastModifiedTime;
- private Connection connection;
+ private DataSource ds;
private static int BATCHSIZE = 1024 * 256;
private List<String> results = new LinkedList<String>();
private String lastId = null;
- public ChunkIdIterator(Connection connection, long
maxLastModifiedTime) {
+ public ChunkIdIterator(DataSource ds, long maxLastModifiedTime) {
this.maxLastModifiedTime = maxLastModifiedTime;
- this.connection = connection;
+ this.ds = ds;
}
@Override
@@ -403,7 +419,9 @@ public class RDBBlobStore extends Cachin
}
query.append(" order by id limit " + BATCHSIZE);
+ Connection connection = null;
try {
+ connection = ds.getConnection();
try {
PreparedStatement prep =
connection.prepareStatement(query.toString());
int idx = 1;
@@ -422,10 +440,14 @@ public class RDBBlobStore extends Cachin
return !results.isEmpty();
} finally {
connection.commit();
+ connection.close();
}
} catch (SQLException ex) {
try {
- connection.rollback();
+ if (connection != null) {
+ connection.rollback();
+ connection.close();
+ }
} catch (SQLException e) {
}
return false;