Author: reschke
Date: Mon Mar 31 21:07:28 2014
New Revision: 1583465

URL: http://svn.apache.org/r1583465
Log:
OAK-1533 - switch RDBBlobStore to leverage DataSource

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java?rev=1583465&r1=1583464&r2=1583465&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
 Mon Mar 31 21:07:28 2014
@@ -47,7 +47,7 @@ public class RDBBlobStore extends Cachin
         try {
             String jdbcurl = "jdbc:h2:mem:oaknodes";
             DataSource ds = RDBDataSourceFactory.forJdbcUrl(jdbcurl, "sa", "");
-            initialize(ds.getConnection());
+            initialize(ds);
         } catch (Exception ex) {
             throw new MicroKernelException("initializing RDB blob store", ex);
         }
@@ -60,7 +60,7 @@ public class RDBBlobStore extends Cachin
     public RDBBlobStore(String jdbcurl, String username, String password) {
         try {
             DataSource ds = RDBDataSourceFactory.forJdbcUrl(jdbcurl, username, 
password);
-            initialize(ds.getConnection());
+            initialize(ds);
         } catch (Exception ex) {
             throw new MicroKernelException("initializing RDB blob store", ex);
         }
@@ -72,7 +72,7 @@ public class RDBBlobStore extends Cachin
      */
     public RDBBlobStore(DataSource ds) {
         try {
-            initialize(ds.getConnection());
+            initialize(ds);
         } catch (Exception ex) {
             throw new MicroKernelException("initializing RDB blob store", ex);
         }
@@ -80,17 +80,12 @@ public class RDBBlobStore extends Cachin
 
     @Override
     public void close() {
-        try {
-            this.connection.close();
-            this.connection = null;
-        } catch (SQLException ex) {
-            throw new MicroKernelException(ex);
-        }
+        this.ds = null;
     }
 
     @Override
     public void finalize() {
-        if (this.connection != null && this.callStack != null) {
+        if (this.ds != null && this.callStack != null) {
             LOG.debug("finalizing RDBDocumentStore that was not disposed", 
this.callStack);
         }
     }
@@ -99,47 +94,55 @@ public class RDBBlobStore extends Cachin
 
     private Exception callStack;
 
-    private Connection connection;
-
-    private void initialize(Connection con) throws Exception {
-        con.setAutoCommit(false);
+    private DataSource ds;
 
-        for (String tableName : new String[] { "DATASTORE_META", 
"DATASTORE_DATA" }) {
-            try {
-                PreparedStatement stmt = con.prepareStatement("select ID from 
" + tableName + " where ID = ?");
-                stmt.setString(1, "0");
-                stmt.executeQuery();
-            } catch (SQLException ex) {
-                // table does not appear to exist
-                con.rollback();
+    private void initialize(DataSource ds) throws Exception {
 
-                String dbtype = con.getMetaData().getDatabaseProductName();
-                LOG.info("Attempting to create table " + tableName + " in " + 
dbtype);
+        this.ds = ds;
+        Connection con = ds.getConnection();
 
-                Statement stmt = con.createStatement();
+        try {
+            con.setAutoCommit(false);
 
-                if (tableName.equals("DATASTORE_META")) {
-                    stmt.execute("create table " + tableName
-                            + " (ID varchar(1000) not null primary key, LEVEL 
int, LASTMOD bigint)");
-                } else {
-                    // the code below likely will need to be extended for new
-                    // database types
-                    if ("PostgreSQL".equals(dbtype)) {
-                        stmt.execute("create table " + tableName + " (ID 
varchar(1000) not null primary key, DATA bytea)");
-                    } else if ("DB2".equals(dbtype) || (dbtype != null && 
dbtype.startsWith("DB2/"))) {
-                        stmt.execute("create table " + tableName + " (ID 
varchar(1000) not null primary key, DATA blob)");
+            for (String tableName : new String[] { "DATASTORE_META", 
"DATASTORE_DATA" }) {
+                try {
+                    PreparedStatement stmt = con.prepareStatement("select ID 
from " + tableName + " where ID = ?");
+                    stmt.setString(1, "0");
+                    stmt.executeQuery();
+                } catch (SQLException ex) {
+                    // table does not appear to exist
+                    con.rollback();
+
+                    String dbtype = con.getMetaData().getDatabaseProductName();
+                    LOG.info("Attempting to create table " + tableName + " in 
" + dbtype);
+
+                    Statement stmt = con.createStatement();
+
+                    if (tableName.equals("DATASTORE_META")) {
+                        stmt.execute("create table " + tableName
+                                + " (ID varchar(1000) not null primary key, 
LEVEL int, LASTMOD bigint)");
                     } else {
-                        stmt.execute("create table " + tableName + " (ID 
varchar(1000) not null primary key, DATA blob)");
+                        // the code below likely will need to be extended for
+                        // new
+                        // database types
+                        if ("PostgreSQL".equals(dbtype)) {
+                            stmt.execute("create table " + tableName + " (ID 
varchar(1000) not null primary key, DATA bytea)");
+                        } else if ("DB2".equals(dbtype) || (dbtype != null && 
dbtype.startsWith("DB2/"))) {
+                            stmt.execute("create table " + tableName + " (ID 
varchar(1000) not null primary key, DATA blob)");
+                        } else {
+                            stmt.execute("create table " + tableName + " (ID 
varchar(1000) not null primary key, DATA blob)");
+                        }
                     }
-                }
 
-                stmt.close();
+                    stmt.close();
 
-                con.commit();
+                    con.commit();
+                }
             }
+        } finally {
+            // con.close();
         }
 
-        this.connection = con;
         this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of 
RDBBlobStore creation") : null;
     }
 
@@ -155,11 +158,14 @@ public class RDBBlobStore extends Cachin
     }
 
     private void storeBlockInDatabase(byte[] digest, int level, byte[] data) 
throws SQLException {
+
         String id = StringUtils.convertBytesToHex(digest);
         cache.put(id, data);
+        Connection con = ds.getConnection();
+
         try {
             long now = System.currentTimeMillis();
-            PreparedStatement prep = connection.prepareStatement("update 
datastore_meta set lastMod = ? where id = ?");
+            PreparedStatement prep = con.prepareStatement("update 
datastore_meta set lastMod = ? where id = ?");
             int count;
             try {
                 prep.setLong(1, now);
@@ -170,7 +176,7 @@ public class RDBBlobStore extends Cachin
             }
             if (count == 0) {
                 try {
-                    prep = connection.prepareStatement("insert into 
datastore_data(id, data) values(?, ?)");
+                    prep = con.prepareStatement("insert into 
datastore_data(id, data) values(?, ?)");
                     try {
                         prep.setString(1, id);
                         prep.setBytes(2, data);
@@ -182,7 +188,7 @@ public class RDBBlobStore extends Cachin
                     // already exists - ok
                 }
                 try {
-                    prep = connection.prepareStatement("insert into 
datastore_meta(id, level, lastMod) values(?, ?, ?)");
+                    prep = con.prepareStatement("insert into 
datastore_meta(id, level, lastMod) values(?, ?, ?)");
                     try {
                         prep.setString(1, id);
                         prep.setInt(2, level);
@@ -196,16 +202,20 @@ public class RDBBlobStore extends Cachin
                 }
             }
         } finally {
-            connection.commit();
+            con.commit();
+            con.close();
         }
     }
 
     @Override
     protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
+
         String id = StringUtils.convertBytesToHex(blockId.getDigest());
         byte[] data = cache.get(id);
+        Connection con = ds.getConnection();
+
         try {
-            PreparedStatement prep = connection.prepareStatement("select data 
from datastore_data where id = ?");
+            PreparedStatement prep = con.prepareStatement("select data from 
datastore_data where id = ?");
             try {
                 prep.setString(1, id);
                 ResultSet rs = prep.executeQuery();
@@ -218,7 +228,8 @@ public class RDBBlobStore extends Cachin
             }
             cache.put(id, data);
         } finally {
-            connection.commit();
+            con.commit();
+            con.close();
         }
         // System.out.println("    read block " + id + " blockLen: " +
         // data.length + " [0]: " + data[0]);
@@ -247,20 +258,21 @@ public class RDBBlobStore extends Cachin
 
     @Override
     protected void mark(BlockId blockId) throws Exception {
+        Connection con = ds.getConnection();
         try {
             if (minLastModified == 0) {
                 return;
             }
             String id = StringUtils.convertBytesToHex(blockId.getDigest());
-            PreparedStatement prep = connection
-                    .prepareStatement("update datastore_meta set lastMod = ? 
where id = ? and lastMod < ?");
+            PreparedStatement prep = con.prepareStatement("update 
datastore_meta set lastMod = ? where id = ? and lastMod < ?");
             prep.setLong(1, System.currentTimeMillis());
             prep.setString(2, id);
             prep.setLong(3, minLastModified);
             prep.executeUpdate();
             prep.close();
         } finally {
-            connection.commit();
+            con.commit();
+            con.close();
         }
     }
 
@@ -274,17 +286,18 @@ public class RDBBlobStore extends Cachin
     }
 
     private int sweepFromDatabase() throws SQLException {
+        Connection con = ds.getConnection();
         try {
             int count = 0;
-            PreparedStatement prep = connection.prepareStatement("select id 
from datastore_meta where lastMod < ?");
+            PreparedStatement prep = con.prepareStatement("select id from 
datastore_meta where lastMod < ?");
             prep.setLong(1, minLastModified);
             ResultSet rs = prep.executeQuery();
             ArrayList<String> ids = new ArrayList<String>();
             while (rs.next()) {
                 ids.add(rs.getString(1));
             }
-            prep = connection.prepareStatement("delete from datastore_meta 
where id = ?");
-            PreparedStatement prepData = connection.prepareStatement("delete 
from datastore_data where id = ?");
+            prep = con.prepareStatement("delete from datastore_meta where id = 
?");
+            PreparedStatement prepData = con.prepareStatement("delete from 
datastore_data where id = ?");
             for (String id : ids) {
                 prep.setString(1, id);
                 prep.execute();
@@ -297,12 +310,14 @@ public class RDBBlobStore extends Cachin
             minLastModified = 0;
             return count;
         } finally {
-            connection.commit();
+            con.commit();
+            con.close();
         }
     }
 
     @Override
     public boolean deleteChunks(List<String> chunkIds, long 
maxLastModifiedTime) throws Exception {
+        Connection con = ds.getConnection();
         try {
             PreparedStatement prep = null;
             PreparedStatement prepData = null;
@@ -317,25 +332,25 @@ public class RDBBlobStore extends Cachin
             }
 
             if (maxLastModifiedTime > 0) {
-                prep = connection.prepareStatement(
+                prep = con.prepareStatement(
                         "delete from datastore_meta where id in (" 
                                 + inClause.toString() + ") and lastMod <= ?");
                 prep.setLong(batch + 1, maxLastModifiedTime);
 
-                prepData = connection.prepareStatement(
+                prepData = con.prepareStatement(
                         "delete from datastore_data where id in (" 
                                 + inClause.toString() + ") and lastMod <= ?");
                 prepData.setLong(batch + 1, maxLastModifiedTime);
             } else {
-                prep = connection.prepareStatement(
+                prep = con.prepareStatement(
                         "delete from datastore_meta where id in (" 
                                 + inClause.toString() + ")");
 
-                prepData = connection.prepareStatement(
+                prepData = con.prepareStatement(
                         "delete from datastore_data where id in (" 
                                 + inClause.toString() + ")");
             }
-            
+
             for (int idx = 0; idx < batch; idx++) {
                 prep.setString(idx + 1, chunkIds.get(idx));
                 prepData.setString(idx + 1, chunkIds.get(idx));
@@ -346,7 +361,8 @@ public class RDBBlobStore extends Cachin
             prep.close();
             prepData.close();
         } finally {
-            connection.commit();
+            con.commit();
+            con.close();
         }
 
         return true;
@@ -354,7 +370,7 @@ public class RDBBlobStore extends Cachin
 
     @Override
     public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws 
Exception {
-        return new ChunkIdIterator(this.connection, maxLastModifiedTime);
+        return new ChunkIdIterator(this.ds, maxLastModifiedTime);
     }
 
 
@@ -364,14 +380,14 @@ public class RDBBlobStore extends Cachin
     private static class ChunkIdIterator extends AbstractIterator<String> {
 
         private long maxLastModifiedTime;
-        private Connection connection;
+        private DataSource ds;
         private static int BATCHSIZE = 1024 * 256;
         private List<String> results = new LinkedList<String>();
         private String lastId = null;
 
-        public ChunkIdIterator(Connection connection, long 
maxLastModifiedTime) {
+        public ChunkIdIterator(DataSource ds, long maxLastModifiedTime) {
             this.maxLastModifiedTime = maxLastModifiedTime;
-            this.connection = connection;
+            this.ds = ds;
         }
 
         @Override
@@ -403,7 +419,9 @@ public class RDBBlobStore extends Cachin
             }
             query.append(" order by id limit " + BATCHSIZE);
 
+            Connection connection = null;
             try {
+                connection = ds.getConnection();
                 try {
                     PreparedStatement prep = 
connection.prepareStatement(query.toString());
                     int idx = 1;
@@ -422,10 +440,14 @@ public class RDBBlobStore extends Cachin
                     return !results.isEmpty();
                 } finally {
                     connection.commit();
+                    connection.close();
                 }
             } catch (SQLException ex) {
                 try {
-                    connection.rollback();
+                    if (connection != null) {
+                        connection.rollback();
+                        connection.close();
+                    }
                 } catch (SQLException e) {
                 }
                 return false;


Reply via email to