Author: reschke
Date: Thu Nov 12 16:21:14 2015
New Revision: 1714084
URL: http://svn.apache.org/viewvc?rev=1714084&view=rev
Log:
OAK-3605: RDBBlob/DocumentStore: reduce class complexity
Move low-level DB code into separate class
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1714084&r1=1714083&r2=1714084&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Thu Nov 12 16:21:14 2015
@@ -74,7 +74,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
-import
org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStoreDB.FETCHFIRSTSYNTAX;
import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
import org.apache.jackrabbit.oak.util.OakVersion;
import org.slf4j.Logger;
@@ -307,46 +306,15 @@ public class RDBDocumentStore implements
@Override
public long determineServerTimeDifferenceMillis() {
Connection connection = null;
- PreparedStatement stmt = null;
- ResultSet rs = null;
- String tableName = getTable(Collection.NODES).getName();
- long result;
try {
connection = this.ch.getROConnection();
- String t = "select ";
- if (this.db.getFetchFirstSyntax() == FETCHFIRSTSYNTAX.TOP) {
- t += "TOP 1 ";
- }
- t += this.db.getCurrentTimeStampInMsSyntax() + " from " +
tableName;
- switch (this.db.getFetchFirstSyntax()) {
- case LIMIT:
- t += " LIMIT 1";
- break;
- case FETCHFIRST:
- t += " FETCH FIRST 1 ROWS ONLY";
- break;
- default:
- break;
- }
-
- stmt = connection.prepareStatement(t);
- long start = System.currentTimeMillis();
- rs = stmt.executeQuery();
- if (rs.next()) {
- long roundtrip = System.currentTimeMillis() - start;
- long serverTime = rs.getTimestamp(1).getTime();
- result = (start + roundtrip / 2) - serverTime;
- } else {
- throw new DocumentStoreException("failed to determine server
timestamp");
- }
+ long result =
this.db.determineServerTimeDifferenceMillis(connection,
getTable(Collection.NODES));
connection.commit();
return result;
- } catch (Exception ex) {
- LOG.error("", ex);
+ } catch (SQLException ex) {
+ LOG.error("Trying to determine time difference to server", ex);
throw new DocumentStoreException(ex);
} finally {
- closeResultSet(rs);
- closeStatement(stmt);
this.ch.closeConnection(connection);
}
}
@@ -404,13 +372,13 @@ public class RDBDocumentStore implements
* Holds the data about a table that can vary: name, whether the primary
key
* is binary, and the estimated size of the "data" column.
*/
- private static class TableMetaData {
+ static class RDBTableMetaData {
final String name;
boolean idIsBinary = false;
private int dataLimitInOctets = 16384;
- public TableMetaData(String name) {
+ public RDBTableMetaData(String name) {
this.name = name;
}
@@ -435,7 +403,7 @@ public class RDBDocumentStore implements
}
}
- private final Map<Collection<? extends Document>, TableMetaData> tableMeta
= new HashMap<Collection<? extends Document>, TableMetaData>();
+ private final Map<Collection<? extends Document>, RDBTableMetaData>
tableMeta = new HashMap<Collection<? extends Document>, RDBTableMetaData>();
@Override
public void dispose() {
@@ -501,7 +469,7 @@ public class RDBDocumentStore implements
/**
* Optional counter for changes to "_collisions" map ({@link
NodeDocument#COLLISIONS}).
*/
- private static final String COLLISIONSMODCOUNT = "_collisionsModCount";
+ public static final String COLLISIONSMODCOUNT = "_collisionsModCount";
private static final String ID = "_id";
@@ -520,7 +488,7 @@ public class RDBDocumentStore implements
// a) single characters will fit into 3 bytes
// b) a surrogate pair (two Java characters) will fit into 4 bytes
// thus...
- private static final int CHAR2OCTETRATIO = 3;
+ public static final int CHAR2OCTETRATIO = 3;
// number of retries for updates
private static final int RETRIES = 10;
@@ -531,7 +499,10 @@ public class RDBDocumentStore implements
private static final Key MODIFIEDKEY = new Key(MODIFIED, null);
// DB-specific information
- private RDBDocumentStoreDB db;
+ private RDBDocumentStoreDB dbInfo;
+
+ // utility class for performing low-level operations
+ private RDBDocumentStoreJDBC db;
private Map<String, String> metadata;
@@ -543,18 +514,18 @@ public class RDBDocumentStore implements
private static final Set<String> COLUMNPROPERTIES = new
HashSet<String>(Arrays.asList(new String[] { ID,
NodeDocument.HAS_BINARY_FLAG, NodeDocument.DELETED_ONCE,
COLLISIONSMODCOUNT, MODIFIED, MODCOUNT }));
- private final RDBDocumentSerializer SR = new RDBDocumentSerializer(this,
COLUMNPROPERTIES);
+ private final RDBDocumentSerializer ser = new RDBDocumentSerializer(this,
COLUMNPROPERTIES);
private void initialize(DataSource ds, DocumentMK.Builder builder,
RDBOptions options) throws Exception {
this.tableMeta.put(Collection.NODES,
- new TableMetaData(createTableName(options.getTablePrefix(),
TABLEMAP.get(Collection.NODES))));
+ new RDBTableMetaData(createTableName(options.getTablePrefix(),
TABLEMAP.get(Collection.NODES))));
this.tableMeta.put(Collection.CLUSTER_NODES,
- new TableMetaData(createTableName(options.getTablePrefix(),
TABLEMAP.get(Collection.CLUSTER_NODES))));
+ new RDBTableMetaData(createTableName(options.getTablePrefix(),
TABLEMAP.get(Collection.CLUSTER_NODES))));
this.tableMeta.put(Collection.JOURNAL,
- new TableMetaData(createTableName(options.getTablePrefix(),
TABLEMAP.get(Collection.JOURNAL))));
+ new RDBTableMetaData(createTableName(options.getTablePrefix(),
TABLEMAP.get(Collection.JOURNAL))));
this.tableMeta.put(Collection.SETTINGS,
- new TableMetaData(createTableName(options.getTablePrefix(),
TABLEMAP.get(Collection.SETTINGS))));
+ new RDBTableMetaData(createTableName(options.getTablePrefix(),
TABLEMAP.get(Collection.SETTINGS))));
this.ch = new RDBConnectionHandler(ds);
this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of
RDBDocumentStore creation") : null;
@@ -580,22 +551,23 @@ public class RDBDocumentStore implements
md.getDriverMinorVersion()).replaceAll("[\r\n\t]", " ").trim();
String dbUrl = md.getURL();
- this.db = RDBDocumentStoreDB.getValue(md.getDatabaseProductName());
+ this.dbInfo = RDBDocumentStoreDB.getValue(md.getDatabaseProductName());
+ this.db = new RDBDocumentStoreJDBC(this.dbInfo, this.ser,
QUERYHITSLIMIT, QUERYTIMELIMIT);
this.metadata = ImmutableMap.<String,String>builder()
.put("type", "rdb")
.put("db", md.getDatabaseProductName())
.put("version", md.getDatabaseProductVersion())
.build();
- String versionDiags = db.checkVersion(md);
+ String versionDiags = dbInfo.checkVersion(md);
if (!versionDiags.isEmpty()) {
LOG.info(versionDiags);
}
- if (! "".equals(db.getInitializationStatement())) {
+ if (! "".equals(dbInfo.getInitializationStatement())) {
Statement stmt = null;
try {
stmt = con.createStatement();
- stmt.execute(db.getInitializationStatement());
+ stmt.execute(dbInfo.getInitializationStatement());
stmt.close();
con.commit();
}
@@ -629,7 +601,7 @@ public class RDBDocumentStore implements
tableDiags.insert(0, ", ");
}
- String diag = db.getAdditionalDiagnostics(this.ch,
this.tableMeta.get(Collection.NODES).getName());
+ String diag = dbInfo.getAdditionalDiagnostics(this.ch,
this.tableMeta.get(Collection.NODES).getName());
LOG.info("RDBDocumentStore (" + OakVersion.getVersion() + ")
instantiated for database " + dbDesc + ", using driver: "
+ driverDesc + ", connecting to: " + dbUrl + (diag.isEmpty() ?
"" : (", properties: " + diag))
@@ -647,7 +619,7 @@ public class RDBDocumentStore implements
return sqlType == Types.VARBINARY || sqlType == Types.BINARY ||
sqlType == Types.LONGVARBINARY;
}
- private void obtainFlagsFromResultSetMeta(ResultSetMetaData met,
TableMetaData tmd) throws SQLException {
+ private void obtainFlagsFromResultSetMeta(ResultSetMetaData met,
RDBTableMetaData tmd) throws SQLException {
for (int i = 1; i <= met.getColumnCount(); i++) {
String lcName = met.getColumnName(i).toLowerCase(Locale.ENGLISH);
if ("id".equals(lcName)) {
@@ -758,9 +730,9 @@ public class RDBDocumentStore implements
}
}
- private void createTableFor(Connection con, Collection<? extends Document>
col, TableMetaData tmd, List<String> tablesCreated,
+ private void createTableFor(Connection con, Collection<? extends Document>
col, RDBTableMetaData tmd, List<String> tablesCreated,
List<String> tablesPresent, StringBuilder diagnostics) throws
SQLException {
- String dbname = this.db.toString();
+ String dbname = this.dbInfo.toString();
if (con.getMetaData().getURL() != null) {
dbname += " (" + con.getMetaData().getURL() + ")";
}
@@ -794,10 +766,10 @@ public class RDBDocumentStore implements
try {
creatStatement = con.createStatement();
-
creatStatement.execute(this.db.getTableCreationStatement(tableName));
+
creatStatement.execute(this.dbInfo.getTableCreationStatement(tableName));
creatStatement.close();
- for (String ic :
this.db.getIndexCreationStatements(tableName)) {
+ for (String ic :
this.dbInfo.getIndexCreationStatements(tableName)) {
creatStatement = con.createStatement();
creatStatement.execute(ic);
creatStatement.close();
@@ -1073,7 +1045,7 @@ public class RDBDocumentStore implements
Operation modOperation = update.getChanges().get(MODIFIEDKEY);
long modified = getModifiedFromOperation(modOperation);
boolean modifiedIsConditional = modOperation == null ||
modOperation.type != UpdateOp.Operation.Type.SET;
- String appendData = SR.asString(update);
+ String appendData = ser.asString(update);
for (List<String> chunkedIds : Lists.partition(ids, CHUNKSIZE)) {
// remember what we already have in the cache
@@ -1093,11 +1065,11 @@ public class RDBDocumentStore implements
}
Connection connection = null;
- TableMetaData tmd = getTable(collection);
+ RDBTableMetaData tmd = getTable(collection);
boolean success = false;
try {
connection = this.ch.getRWConnection();
- success = dbBatchedAppendingUpdate(connection, tmd,
chunkedIds, modified, modifiedIsConditional, appendData);
+ success = db.batchedAppendingUpdate(connection, tmd,
chunkedIds, modified, modifiedIsConditional, appendData);
connection.commit();
} catch (SQLException ex) {
success = false;
@@ -1213,7 +1185,7 @@ public class RDBDocumentStore implements
private <T extends Document> List<T> internalQuery(Collection<T>
collection, String fromKey, String toKey,
String indexedProperty, long startValue, int limit) {
Connection connection = null;
- TableMetaData tmd = getTable(collection);
+ RDBTableMetaData tmd = getTable(collection);
if (indexedProperty != null &&
(!INDEXEDPROPERTIES.contains(indexedProperty))) {
String message = "indexed property " + indexedProperty + " not
supported, query was '>= '" + startValue
+ "'; supported properties are " + INDEXEDPROPERTIES;
@@ -1227,7 +1199,7 @@ public class RDBDocumentStore implements
connection = this.ch.getROConnection();
String from = collection == Collection.NODES &&
NodeDocument.MIN_ID_VALUE.equals(fromKey) ? null : fromKey;
String to = collection == Collection.NODES &&
NodeDocument.MAX_ID_VALUE.equals(toKey) ? null : toKey;
- List<RDBRow> dbresult = dbQuery(connection, tmd, from, to,
indexedProperty, startValue, limit);
+ List<RDBRow> dbresult = db.query(connection, tmd, from, to,
indexedProperty, startValue, limit);
connection.commit();
int size = dbresult.size();
@@ -1249,8 +1221,8 @@ public class RDBDocumentStore implements
}
@Nonnull
- private <T extends Document> TableMetaData getTable(Collection<T>
collection) {
- TableMetaData tmd = this.tableMeta.get(collection);
+ private <T extends Document> RDBTableMetaData getTable(Collection<T>
collection) {
+ RDBTableMetaData tmd = this.tableMeta.get(collection);
if (tmd != null) {
return tmd;
} else {
@@ -1261,14 +1233,14 @@ public class RDBDocumentStore implements
@CheckForNull
private <T extends Document> T readDocumentUncached(Collection<T>
collection, String id, NodeDocument cachedDoc) {
Connection connection = null;
- TableMetaData tmd = getTable(collection);
+ RDBTableMetaData tmd = getTable(collection);
try {
long lastmodcount = -1;
if (cachedDoc != null) {
lastmodcount = modcountOf(cachedDoc);
}
connection = this.ch.getROConnection();
- RDBRow row = dbRead(connection, tmd, id, lastmodcount);
+ RDBRow row = db.read(connection, tmd, id, lastmodcount);
connection.commit();
if (row == null) {
return null;
@@ -1290,10 +1262,10 @@ public class RDBDocumentStore implements
private <T extends Document> void delete(Collection<T> collection, String
id) {
Connection connection = null;
- TableMetaData tmd = getTable(collection);
+ RDBTableMetaData tmd = getTable(collection);
try {
connection = this.ch.getRWConnection();
- dbDelete(connection, tmd, Collections.singletonList(id));
+ db.delete(connection, tmd, Collections.singletonList(id));
connection.commit();
} catch (Exception ex) {
throw new DocumentStoreException(ex);
@@ -1304,12 +1276,12 @@ public class RDBDocumentStore implements
private <T extends Document> int delete(Collection<T> collection,
List<String> ids) {
int numDeleted = 0;
- TableMetaData tmd = getTable(collection);
+ RDBTableMetaData tmd = getTable(collection);
for (List<String> sublist : Lists.partition(ids, 64)) {
Connection connection = null;
try {
connection = this.ch.getRWConnection();
- numDeleted += dbDelete(connection, tmd, sublist);
+ numDeleted += db.delete(connection, tmd, sublist);
connection.commit();
} catch (Exception ex) {
throw new DocumentStoreException(ex);
@@ -1323,7 +1295,7 @@ public class RDBDocumentStore implements
private <T extends Document> int delete(Collection<T> collection,
Map<String, Map<Key, Condition>>
toRemove) {
int numDeleted = 0;
- TableMetaData tmd = getTable(collection);
+ RDBTableMetaData tmd = getTable(collection);
Map<String, Map<Key, Condition>> subMap = Maps.newHashMap();
Iterator<Entry<String, Map<Key, Condition>>> it =
toRemove.entrySet().iterator();
while (it.hasNext()) {
@@ -1333,7 +1305,7 @@ public class RDBDocumentStore implements
Connection connection = null;
try {
connection = this.ch.getRWConnection();
- numDeleted += dbDelete(connection, tmd, subMap);
+ numDeleted += db.delete(connection, tmd, subMap);
connection.commit();
} catch (Exception ex) {
throw DocumentStoreException.convert(ex);
@@ -1349,7 +1321,7 @@ public class RDBDocumentStore implements
private <T extends Document> boolean updateDocument(@Nonnull Collection<T>
collection, @Nonnull T document,
@Nonnull UpdateOp update, Long oldmodcount) {
Connection connection = null;
- TableMetaData tmd = getTable(collection);
+ RDBTableMetaData tmd = getTable(collection);
String data = null;
try {
connection = this.ch.getRWConnection();
@@ -1367,10 +1339,10 @@ public class RDBDocumentStore implements
// every 16th update is a full rewrite
if (isAppendableUpdate(update) && modcount % 16 != 0) {
- String appendData = SR.asString(update);
+ String appendData = ser.asString(update);
if (appendData.length() < tmd.getDataLimitInOctets() /
CHAR2OCTETRATIO) {
try {
- success = dbAppendingUpdate(connection, tmd,
document.getId(), modified, modifiedIsConditional, hasBinary,
+ success = db.appendingUpdate(connection, tmd,
document.getId(), modified, modifiedIsConditional, hasBinary,
deletedOnce, modcount, cmodcount, oldmodcount,
appendData);
// if we get here, a retry is not going to help (the
SQL
// operation succeeded but simply did not select a row
@@ -1386,8 +1358,8 @@ public class RDBDocumentStore implements
}
}
if (!success && shouldRetry) {
- data = SR.asString(document);
- success = dbUpdate(connection, tmd, document.getId(),
modified, hasBinary, deletedOnce, modcount, cmodcount,
+ data = ser.asString(document);
+ success = db.update(connection, tmd, document.getId(),
modified, hasBinary, deletedOnce, modcount, cmodcount,
oldmodcount, data);
connection.commit();
}
@@ -1439,10 +1411,10 @@ public class RDBDocumentStore implements
private <T extends Document> boolean insertDocuments(Collection<T>
collection, List<T> documents) {
Connection connection = null;
- TableMetaData tmd = getTable(collection);
+ RDBTableMetaData tmd = getTable(collection);
try {
connection = this.ch.getRWConnection();
- boolean result = dbInsert(connection, tmd, documents);
+ boolean result = db.insert(connection, tmd, documents);
connection.commit();
return result;
} catch (SQLException ex) {
@@ -1470,7 +1442,7 @@ public class RDBDocumentStore implements
int longest = 0, longestChars = 0;
for (Document d : documents) {
- String data = SR.asString(d);
+ String data = ser.asString(d);
byte bytes[] = asBytes(data);
if (bytes.length > longest) {
longest = bytes.length;
@@ -1510,7 +1482,7 @@ public class RDBDocumentStore implements
private static final int QUERYTIMELIMIT = Integer.getInteger(
"org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.QUERYTIMELIMIT",
10000);
- private static byte[] asBytes(String data) {
+ public static byte[] asBytes(String data) {
byte[] bytes;
try {
bytes = data.getBytes("UTF-8");
@@ -1540,446 +1512,6 @@ public class RDBDocumentStore implements
}
}
- private static void setIdInStatement(TableMetaData tmd, PreparedStatement
stmt, int idx, String id) throws SQLException {
- if (tmd.isIdBinary()) {
- try {
- stmt.setBytes(idx, id.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException ex) {
- LOG.error("UTF-8 not supported??", ex);
- throw new DocumentStoreException(ex);
- }
- } else {
- stmt.setString(idx, id);
- }
- }
-
- private static String getIdFromRS(TableMetaData tmd, ResultSet rs, int
idx) throws SQLException {
- if (tmd.isIdBinary()) {
- try {
- return new String(rs.getBytes(idx), "UTF-8");
- } catch (UnsupportedEncodingException ex) {
- LOG.error("UTF-8 not supported??", ex);
- throw new DocumentStoreException(ex);
- }
- } else {
- return rs.getString(idx);
- }
- }
-
- @CheckForNull
- private RDBRow dbRead(Connection connection, TableMetaData tmd, String id,
long lastmodcount) throws SQLException {
- PreparedStatement stmt;
-
- boolean useCaseStatement = lastmodcount != -1 &&
this.db.allowsCaseInSelect();
- if (useCaseStatement) {
- // the case statement causes the actual row data not to be
- // sent in case we already have it
- stmt = connection
- .prepareStatement("select MODIFIED, MODCOUNT, CMODCOUNT,
HASBINARY, DELETEDONCE, case MODCOUNT when ? then null else DATA end as DATA, "
- + "case MODCOUNT when ? then null else BDATA end
as BDATA from " + tmd.getName() + " where ID = ?");
- } else {
- // either we don't have a previous version of the document
- // or the database does not support CASE in SELECT
- stmt = connection.prepareStatement("select MODIFIED, MODCOUNT,
CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA from "
- + tmd.getName() + " where ID = ?");
- }
-
- try {
- int si = 1;
- if (useCaseStatement) {
- stmt.setLong(si++, lastmodcount);
- stmt.setLong(si++, lastmodcount);
- }
- setIdInStatement(tmd, stmt, si, id);
-
- ResultSet rs = stmt.executeQuery();
- if (rs.next()) {
- long modified = rs.getLong(1);
- long modcount = rs.getLong(2);
- long cmodcount = rs.getLong(3);
- long hasBinary = rs.getLong(4);
- long deletedOnce = rs.getLong(5);
- String data = rs.getString(6);
- byte[] bdata = rs.getBytes(7);
- return new RDBRow(id, hasBinary == 1, deletedOnce == 1,
modified, modcount, cmodcount, data, bdata);
- } else {
- return null;
- }
- } catch (SQLException ex) {
- LOG.error("attempting to read " + id + " (id length is " +
id.length() + ")", ex);
- // DB2 throws an SQLException for invalid keys; handle this more
- // gracefully
- if ("22001".equals(ex.getSQLState())) {
- this.ch.rollbackConnection(connection);
- return null;
- } else {
- throw (ex);
- }
- } finally {
- stmt.close();
- }
- }
-
- private List<RDBRow> dbQuery(Connection connection, TableMetaData tmd,
String minId, String maxId, String indexedProperty,
- long startValue, int limit) throws SQLException {
- long start = System.currentTimeMillis();
- StringBuilder selectClause = new StringBuilder();
- StringBuilder whereClause = new StringBuilder();
- if (limit != Integer.MAX_VALUE && this.db.getFetchFirstSyntax() ==
FETCHFIRSTSYNTAX.TOP) {
- selectClause.append("TOP " + limit + " ");
- }
- selectClause.append("ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY,
DELETEDONCE, DATA, BDATA from ").append(tmd.getName());
-
- // dynamically build where clause
- String whereSep = "";
- if (minId != null) {
- whereClause.append("ID > ?");
- whereSep = " and ";
- }
- if (maxId != null) {
- whereClause.append(whereSep).append("ID < ?");
- whereSep = " and ";
- }
-
- if (indexedProperty != null) {
- if (MODIFIED.equals(indexedProperty)) {
- whereClause.append(whereSep).append("MODIFIED >= ?");
- } else if (NodeDocument.HAS_BINARY_FLAG.equals(indexedProperty)) {
- if (startValue != NodeDocument.HAS_BINARY_VAL) {
- throw new DocumentStoreException("unsupported value for
property " + NodeDocument.HAS_BINARY_FLAG);
- }
- whereClause.append(whereSep).append("HASBINARY = 1");
- } else if (NodeDocument.DELETED_ONCE.equals(indexedProperty)) {
- if (startValue != 1) {
- throw new DocumentStoreException("unsupported value for
property " + NodeDocument.DELETED_ONCE);
- }
- whereClause.append(whereSep).append("DELETEDONCE = 1");
- } else {
- throw new DocumentStoreException("unsupported indexed
property: " + indexedProperty);
- }
- }
-
- StringBuilder query = new StringBuilder();
- query.append("select ").append(selectClause);
- if (whereClause.length() != 0) {
- query.append(" where ").append(whereClause);
- }
-
- query.append(" order by ID");
-
- if (limit != Integer.MAX_VALUE) {
- switch (this.db.getFetchFirstSyntax()) {
- case LIMIT:
- query.append(" LIMIT " + limit);
- break;
- case FETCHFIRST:
- query.append(" FETCH FIRST " + limit + " ROWS ONLY");
- break;
- default:
- break;
- }
- }
-
- PreparedStatement stmt = connection.prepareStatement(query.toString());
- List<RDBRow> result = new ArrayList<RDBRow>();
- long dataTotal = 0, bdataTotal = 0;
- try {
- int si = 1;
- if (minId != null) {
- setIdInStatement(tmd, stmt, si++, minId);
- }
- if (maxId != null) {
- setIdInStatement(tmd, stmt, si++, maxId);
- }
-
- if (MODIFIED.equals(indexedProperty)) {
- stmt.setLong(si++, startValue);
- }
- if (limit != Integer.MAX_VALUE) {
- stmt.setFetchSize(limit);
- }
- ResultSet rs = stmt.executeQuery();
- while (rs.next() && result.size() < limit) {
- String id = getIdFromRS(tmd, rs, 1);
-
- if ((minId != null && id.compareTo(minId) < 0) || (maxId !=
null && id.compareTo(maxId) > 0)) {
- throw new DocumentStoreException(
- "unexpected query result: '" + minId + "' < '" +
id + "' < '" + maxId + "' - broken DB collation?");
- }
- long modified = rs.getLong(2);
- long modcount = rs.getLong(3);
- long cmodcount = rs.getLong(4);
- long hasBinary = rs.getLong(5);
- long deletedOnce = rs.getLong(6);
- String data = rs.getString(7);
- byte[] bdata = rs.getBytes(8);
- result.add(new RDBRow(id, hasBinary == 1, deletedOnce == 1,
modified, modcount, cmodcount, data, bdata));
- dataTotal += data.length();
- bdataTotal += bdata == null ? 0 : bdata.length;
- }
- } finally {
- stmt.close();
- }
-
- long elapsed = System.currentTimeMillis() - start;
- if (QUERYHITSLIMIT != 0 && result.size() > QUERYHITSLIMIT) {
- String message = String.format("Potentially excessive query with
%d hits (limited to %d, configured QUERYHITSLIMIT %d), elapsed time %dms,
params minid '%s' maxid '%s' indexedProperty %s startValue %d limit %d. Check
calling method.",
- result.size(), limit, QUERYHITSLIMIT, elapsed, minId,
maxId, indexedProperty, startValue, limit);
- LOG.info(message, new Exception("call stack"));
- }
- else if (QUERYTIMELIMIT != 0 && elapsed > QUERYTIMELIMIT) {
- String message = String.format("Long running query with %d hits
(limited to %d), elapsed time %dms (configured QUERYTIMELIMIT %d), params minid
'%s' maxid '%s' indexedProperty %s startValue %d limit %d. Read %d chars from
DATA and %d bytes from BDATA. Check calling method.",
- result.size(), limit, elapsed, QUERYTIMELIMIT, minId,
maxId, indexedProperty, startValue, limit, dataTotal, bdataTotal);
- LOG.info(message, new Exception("call stack"));
- }
-
- return result;
- }
-
- private boolean dbUpdate(Connection connection, TableMetaData tmd, String
id, Long modified, Boolean hasBinary,
- Boolean deletedOnce, Long modcount, Long cmodcount, Long
oldmodcount, String data) throws SQLException {
- String t = "update "
- + tmd.getName()
- + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT
= ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, BDATA = ? where ID = ?";
- if (oldmodcount != null) {
- t += " and MODCOUNT = ?";
- }
- PreparedStatement stmt = connection.prepareStatement(t);
- try {
- int si = 1;
- stmt.setObject(si++, modified, Types.BIGINT);
- stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
- stmt.setObject(si++, deletedOnce ? 1 : 0, Types.SMALLINT);
- stmt.setObject(si++, modcount, Types.BIGINT);
- stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) :
cmodcount, Types.BIGINT);
- stmt.setObject(si++, data.length(), Types.BIGINT);
-
- if (data.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) {
- stmt.setString(si++, data);
- stmt.setBinaryStream(si++, null, 0);
- } else {
- stmt.setString(si++, "\"blob\"");
- byte[] bytes = asBytes(data);
- stmt.setBytes(si++, bytes);
- }
-
- setIdInStatement(tmd, stmt, si++, id);
-
- if (oldmodcount != null) {
- stmt.setObject(si++, oldmodcount, Types.BIGINT);
- }
- int result = stmt.executeUpdate();
- if (result != 1) {
- LOG.debug("DB update failed for " + tmd.getName() + "/" + id +
" with oldmodcount=" + oldmodcount);
- }
- return result == 1;
- } finally {
- stmt.close();
- }
- }
-
- private boolean dbAppendingUpdate(Connection connection, TableMetaData
tmd, String id, Long modified,
- boolean setModifiedConditionally, Boolean hasBinary, Boolean
deletedOnce, Long modcount, Long cmodcount,
- Long oldmodcount, String appendData) throws SQLException {
- StringBuilder t = new StringBuilder();
- t.append("update " + tmd.getName() + " set ");
- t.append(setModifiedConditionally ? "MODIFIED = case when ? > MODIFIED
then ? else MODIFIED end, " : "MODIFIED = ?, ");
- t.append("HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?,
DSIZE = DSIZE + ?, ");
- t.append("DATA = " +
this.db.getConcatQueryString(tmd.getDataLimitInOctets(), appendData.length()) +
" ");
- t.append("where ID = ?");
- if (oldmodcount != null) {
- t.append(" and MODCOUNT = ?");
- }
- PreparedStatement stmt = connection.prepareStatement(t.toString());
- try {
- int si = 1;
- stmt.setObject(si++, modified, Types.BIGINT);
- if (setModifiedConditionally) {
- stmt.setObject(si++, modified, Types.BIGINT);
- }
- stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
- stmt.setObject(si++, deletedOnce ? 1 : 0, Types.SMALLINT);
- stmt.setObject(si++, modcount, Types.BIGINT);
- stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) :
cmodcount, Types.BIGINT);
- stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
- stmt.setString(si++, "," + appendData);
- setIdInStatement(tmd, stmt, si++, id);
-
- if (oldmodcount != null) {
- stmt.setObject(si++, oldmodcount, Types.BIGINT);
- }
- int result = stmt.executeUpdate();
- if (result != 1) {
- LOG.debug("DB append update failed for " + tmd.getName() + "/"
+ id + " with oldmodcount=" + oldmodcount);
- }
- return result == 1;
- } finally {
- stmt.close();
- }
- }
-
- private boolean dbBatchedAppendingUpdate(Connection connection,
TableMetaData tmd, List<String> ids, Long modified,
- boolean setModifiedConditionally,
- String appendData) throws SQLException {
- StringBuilder t = new StringBuilder();
- t.append("update " + tmd.getName() + " set ");
- t.append(setModifiedConditionally ? "MODIFIED = case when ? > MODIFIED
then ? else MODIFIED end, " : "MODIFIED = ?, ");
- t.append("MODCOUNT = MODCOUNT + 1, DSIZE = DSIZE + ?, ");
- t.append("DATA = " +
this.db.getConcatQueryString(tmd.getDataLimitInOctets(), appendData.length()) +
" ");
- t.append("where ID in (");
- for (int i = 0; i < ids.size(); i++) {
- if (i != 0) {
- t.append(',');
- }
- t.append('?');
- }
- t.append(")");
- PreparedStatement stmt = connection.prepareStatement(t.toString());
- try {
- int si = 1;
- stmt.setObject(si++, modified, Types.BIGINT);
- if (setModifiedConditionally) {
- stmt.setObject(si++, modified, Types.BIGINT);
- }
- stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
- stmt.setString(si++, "," + appendData);
- for (String id : ids) {
- setIdInStatement(tmd, stmt, si++, id);
- }
- int result = stmt.executeUpdate();
- if (result != ids.size()) {
- LOG.debug("DB update failed: only " + result + " of " +
ids.size() + " updated. Table: " + tmd.getName() + ", IDs:"
- + ids);
- }
- return result == ids.size();
- } finally {
- stmt.close();
- }
- }
-
- private <T extends Document> boolean dbInsert(Connection connection,
TableMetaData tmd, List<T> documents) throws SQLException {
-
- PreparedStatement stmt = connection.prepareStatement("insert into " +
tmd.getName() +
- "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT,
DSIZE, DATA, BDATA) " +
- "values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
-
- try {
- for (T document : documents) {
- String data = SR.asString(document);
- String id = document.getId();
- Number hasBinary = (Number)
document.get(NodeDocument.HAS_BINARY_FLAG);
- Boolean deletedOnce = (Boolean)
document.get(NodeDocument.DELETED_ONCE);
- Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
-
- int si = 1;
- setIdInStatement(tmd, stmt, si++, id);
- stmt.setObject(si++, document.get(MODIFIED), Types.BIGINT);
- stmt.setObject(si++, (hasBinary != null &&
hasBinary.intValue() == NodeDocument.HAS_BINARY_VAL) ? 1 : 0, Types.SMALLINT);
- stmt.setObject(si++, (deletedOnce != null && deletedOnce) ? 1
: 0, Types.SMALLINT);
- stmt.setObject(si++, document.get(MODCOUNT), Types.BIGINT);
- stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) :
cmodcount, Types.BIGINT);
- stmt.setObject(si++, data.length(), Types.BIGINT);
- if (data.length() < tmd.getDataLimitInOctets() /
CHAR2OCTETRATIO) {
- stmt.setString(si++, data);
- stmt.setBinaryStream(si++, null, 0);
- } else {
- stmt.setString(si++, "\"blob\"");
- byte[] bytes = asBytes(data);
- stmt.setBytes(si++, bytes);
- }
- stmt.addBatch();
- }
- int[] results = stmt.executeBatch();
- boolean success = true;
- for (int i = 0; i < documents.size(); i++) {
- int result = results[i];
- if (result != 1 && result != Statement.SUCCESS_NO_INFO) {
- LOG.error("DB insert failed for {}: {}", tmd.getName(),
documents.get(i).getId());
- success = false;
- }
- }
- return success;
- } finally {
- stmt.close();
- }
- }
-
- private int dbDelete(Connection connection, TableMetaData tmd,
List<String> ids) throws SQLException {
-
- PreparedStatement stmt;
- int cnt = ids.size();
-
- if (cnt == 1) {
- stmt = connection.prepareStatement("delete from " + tmd.getName()
+ " where ID=?");
- } else {
- StringBuilder inClause = new StringBuilder();
- for (int i = 0; i < cnt; i++) {
- inClause.append('?');
- if (i != cnt - 1) {
- inClause.append(',');
- }
- }
- stmt = connection.prepareStatement("delete from " + tmd.getName()
+ " where ID in (" + inClause.toString() + ")");
- }
-
- try {
- for (int i = 0; i < cnt; i++) {
- setIdInStatement(tmd, stmt, i + 1, ids.get(i));
- }
- int result = stmt.executeUpdate();
- if (result != cnt) {
- LOG.debug("DB delete failed for " + tmd.getName() + "/" + ids);
- }
- return result;
- } finally {
- stmt.close();
- }
- }
-
- private int dbDelete(Connection connection, TableMetaData tmd,
- Map<String, Map<Key, Condition>> toDelete)
- throws SQLException, DocumentStoreException {
- String or = "";
- StringBuilder whereClause = new StringBuilder();
- for (Entry<String, Map<Key, Condition>> entry : toDelete.entrySet()) {
- whereClause.append(or);
- or = " or ";
- whereClause.append("ID=?");
- for (Entry<Key, Condition> c : entry.getValue().entrySet()) {
- if (!c.getKey().getName().equals(MODIFIED)) {
- throw new DocumentStoreException(
- "Unsupported condition: " + c);
- }
- whereClause.append(" and MODIFIED");
- if (c.getValue().type == Condition.Type.EQUALS
- && c.getValue().value instanceof Long) {
- whereClause.append("=?");
- } else if (c.getValue().type == Condition.Type.EXISTS) {
- whereClause.append(" is not null");
- } else {
- throw new DocumentStoreException(
- "Unsupported condition: " + c);
- }
- }
- }
-
- PreparedStatement stmt= connection.prepareStatement(
- "delete from " + tmd.getName() + " where " + whereClause);
- try {
- int i = 1;
- for (Entry<String, Map<Key, Condition>> entry :
toDelete.entrySet()) {
- setIdInStatement(tmd, stmt, i++, entry.getKey());
- for (Entry<Key, Condition> c : entry.getValue().entrySet()) {
- if (c.getValue().type == Condition.Type.EQUALS) {
- stmt.setLong(i++, (Long) c.getValue().value);
- }
- }
- }
- return stmt.executeUpdate();
- } finally {
- stmt.close();
- }
- }
@Override
public void setReadWriteMode(String readWriteMode) {
@@ -2109,7 +1641,7 @@ public class RDBDocumentStore implements
@Nonnull
protected <T extends Document> T convertFromDBObject(@Nonnull
Collection<T> collection, @Nonnull RDBRow row) {
// this method is present here in order to facilitate unit testing for
OAK-3566
- return SR.fromRow(collection, row);
+ return ser.fromRow(collection, row);
}
private <T extends Document> T runThroughCache(Collection<T> collection,
RDBRow row, long now, QueryContext qp) {
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java?rev=1714084&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
Thu Nov 12 16:21:14 2015
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.rdb;
+
+import static
org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.CHAR2OCTETRATIO;
+import static
org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.asBytes;
+import static
org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet;
+import static
org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement;
+
+import java.io.UnsupportedEncodingException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
+import
org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.RDBTableMetaData;
+import
org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStoreDB.FETCHFIRSTSYNTAX;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements (most) DB interactions used in {@link RDBDocumentStore}.
+ */
+public class RDBDocumentStoreJDBC {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RDBDocumentStoreJDBC.class);
+
+ private static final String COLLISIONSMODCOUNT =
RDBDocumentStore.COLLISIONSMODCOUNT;
+ private static final String MODCOUNT = NodeDocument.MOD_COUNT;
+ private static final String MODIFIED = NodeDocument.MODIFIED_IN_SECS;
+
+ private final RDBDocumentStoreDB dbInfo;
+ private final RDBDocumentSerializer ser;
+ private final int queryHitsLimit, queryTimeLimit;
+
+ public RDBDocumentStoreJDBC(RDBDocumentStoreDB dbInfo,
RDBDocumentSerializer ser, int queryHitsLimit, int queryTimeLimit) {
+ this.dbInfo = dbInfo;
+ this.ser = ser;
+ this.queryHitsLimit = queryHitsLimit;
+ this.queryTimeLimit = queryTimeLimit;
+ }
+
+ public boolean appendingUpdate(Connection connection, RDBTableMetaData
tmd, String id, Long modified,
+ boolean setModifiedConditionally, Boolean hasBinary, Boolean
deletedOnce, Long modcount, Long cmodcount,
+ Long oldmodcount, String appendData) throws SQLException {
+ StringBuilder t = new StringBuilder();
+ t.append("update " + tmd.getName() + " set ");
+ t.append(setModifiedConditionally ? "MODIFIED = case when ? > MODIFIED
then ? else MODIFIED end, " : "MODIFIED = ?, ");
+ t.append("HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?,
DSIZE = DSIZE + ?, ");
+ t.append("DATA = " +
this.dbInfo.getConcatQueryString(tmd.getDataLimitInOctets(),
appendData.length()) + " ");
+ t.append("where ID = ?");
+ if (oldmodcount != null) {
+ t.append(" and MODCOUNT = ?");
+ }
+ PreparedStatement stmt = connection.prepareStatement(t.toString());
+ try {
+ int si = 1;
+ stmt.setObject(si++, modified, Types.BIGINT);
+ if (setModifiedConditionally) {
+ stmt.setObject(si++, modified, Types.BIGINT);
+ }
+ stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, deletedOnce ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, modcount, Types.BIGINT);
+ stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) :
cmodcount, Types.BIGINT);
+ stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
+ stmt.setString(si++, "," + appendData);
+ setIdInStatement(tmd, stmt, si++, id);
+
+ if (oldmodcount != null) {
+ stmt.setObject(si++, oldmodcount, Types.BIGINT);
+ }
+ int result = stmt.executeUpdate();
+ if (result != 1) {
+ LOG.debug("DB append update failed for " + tmd.getName() + "/"
+ id + " with oldmodcount=" + oldmodcount);
+ }
+ return result == 1;
+ } finally {
+ stmt.close();
+ }
+ }
+
+ public boolean batchedAppendingUpdate(Connection connection,
RDBTableMetaData tmd, List<String> ids, Long modified,
+ boolean setModifiedConditionally, String appendData) throws
SQLException {
+ StringBuilder t = new StringBuilder();
+ t.append("update " + tmd.getName() + " set ");
+ t.append(setModifiedConditionally ? "MODIFIED = case when ? > MODIFIED
then ? else MODIFIED end, " : "MODIFIED = ?, ");
+ t.append("MODCOUNT = MODCOUNT + 1, DSIZE = DSIZE + ?, ");
+ t.append("DATA = " +
this.dbInfo.getConcatQueryString(tmd.getDataLimitInOctets(),
appendData.length()) + " ");
+ t.append("where ID in (");
+ for (int i = 0; i < ids.size(); i++) {
+ if (i != 0) {
+ t.append(',');
+ }
+ t.append('?');
+ }
+ t.append(")");
+ PreparedStatement stmt = connection.prepareStatement(t.toString());
+ try {
+ int si = 1;
+ stmt.setObject(si++, modified, Types.BIGINT);
+ if (setModifiedConditionally) {
+ stmt.setObject(si++, modified, Types.BIGINT);
+ }
+ stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
+ stmt.setString(si++, "," + appendData);
+ for (String id : ids) {
+ setIdInStatement(tmd, stmt, si++, id);
+ }
+ int result = stmt.executeUpdate();
+ if (result != ids.size()) {
+ LOG.debug("DB update failed: only " + result + " of " +
ids.size() + " updated. Table: " + tmd.getName() + ", IDs:"
+ + ids);
+ }
+ return result == ids.size();
+ } finally {
+ stmt.close();
+ }
+ }
+
+ public int delete(Connection connection, RDBTableMetaData tmd,
List<String> ids) throws SQLException {
+ PreparedStatement stmt;
+ int cnt = ids.size();
+
+ if (cnt == 1) {
+ stmt = connection.prepareStatement("delete from " + tmd.getName()
+ " where ID=?");
+ } else {
+ StringBuilder inClause = new StringBuilder();
+ for (int i = 0; i < cnt; i++) {
+ inClause.append('?');
+ if (i != cnt - 1) {
+ inClause.append(',');
+ }
+ }
+ stmt = connection.prepareStatement("delete from " + tmd.getName()
+ " where ID in (" + inClause.toString() + ")");
+ }
+
+ try {
+ for (int i = 0; i < cnt; i++) {
+ setIdInStatement(tmd, stmt, i + 1, ids.get(i));
+ }
+ int result = stmt.executeUpdate();
+ if (result != cnt) {
+ LOG.debug("DB delete failed for " + tmd.getName() + "/" + ids);
+ }
+ return result;
+ } finally {
+ stmt.close();
+ }
+ }
+
+ public int delete(Connection connection, RDBTableMetaData tmd, Map<String,
Map<Key, Condition>> toDelete)
+ throws SQLException, DocumentStoreException {
+ String or = "";
+ StringBuilder whereClause = new StringBuilder();
+ for (Entry<String, Map<Key, Condition>> entry : toDelete.entrySet()) {
+ whereClause.append(or);
+ or = " or ";
+ whereClause.append("ID=?");
+ for (Entry<Key, Condition> c : entry.getValue().entrySet()) {
+ if (!c.getKey().getName().equals(MODIFIED)) {
+ throw new DocumentStoreException("Unsupported condition: "
+ c);
+ }
+ whereClause.append(" and MODIFIED");
+ if (c.getValue().type == Condition.Type.EQUALS &&
c.getValue().value instanceof Long) {
+ whereClause.append("=?");
+ } else if (c.getValue().type == Condition.Type.EXISTS) {
+ whereClause.append(" is not null");
+ } else {
+ throw new DocumentStoreException("Unsupported condition: "
+ c);
+ }
+ }
+ }
+
+ PreparedStatement stmt = connection.prepareStatement("delete from " +
tmd.getName() + " where " + whereClause);
+ try {
+ int i = 1;
+ for (Entry<String, Map<Key, Condition>> entry :
toDelete.entrySet()) {
+ setIdInStatement(tmd, stmt, i++, entry.getKey());
+ for (Entry<Key, Condition> c : entry.getValue().entrySet()) {
+ if (c.getValue().type == Condition.Type.EQUALS) {
+ stmt.setLong(i++, (Long) c.getValue().value);
+ }
+ }
+ }
+ return stmt.executeUpdate();
+ } finally {
+ stmt.close();
+ }
+ }
+
+ public long determineServerTimeDifferenceMillis(Connection connection,
RDBTableMetaData tmd) {
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ long result;
+ try {
+ String t = "select ";
+ if (this.dbInfo.getFetchFirstSyntax() == FETCHFIRSTSYNTAX.TOP) {
+ t += "TOP 1 ";
+ }
+ t += this.dbInfo.getCurrentTimeStampInMsSyntax() + " from " +
tmd.getName();
+ switch (this.dbInfo.getFetchFirstSyntax()) {
+ case LIMIT:
+ t += " LIMIT 1";
+ break;
+ case FETCHFIRST:
+ t += " FETCH FIRST 1 ROWS ONLY";
+ break;
+ default:
+ break;
+ }
+
+ stmt = connection.prepareStatement(t);
+ long start = System.currentTimeMillis();
+ rs = stmt.executeQuery();
+ if (rs.next()) {
+ long roundtrip = System.currentTimeMillis() - start;
+ long serverTime = rs.getTimestamp(1).getTime();
+ result = (start + roundtrip / 2) - serverTime;
+ } else {
+ throw new DocumentStoreException("failed to determine server
timestamp");
+ }
+ return result;
+ } catch (Exception ex) {
+ LOG.error("Trying to determine time difference to server", ex);
+ throw new DocumentStoreException(ex);
+ } finally {
+ closeResultSet(rs);
+ closeStatement(stmt);
+ }
+ }
+
+ public <T extends Document> boolean insert(Connection connection,
RDBTableMetaData tmd, List<T> documents) throws SQLException {
+ PreparedStatement stmt = connection.prepareStatement(
+ "insert into " + tmd.getName() + "(ID, MODIFIED, HASBINARY,
DELETEDONCE, MODCOUNT, CMODCOUNT, DSIZE, DATA, BDATA) "
+ + "values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
+
+ try {
+ for (T document : documents) {
+ String data = this.ser.asString(document);
+ String id = document.getId();
+ Number hasBinary = (Number)
document.get(NodeDocument.HAS_BINARY_FLAG);
+ Boolean deletedOnce = (Boolean)
document.get(NodeDocument.DELETED_ONCE);
+ Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
+
+ int si = 1;
+ setIdInStatement(tmd, stmt, si++, id);
+ stmt.setObject(si++, document.get(MODIFIED), Types.BIGINT);
+ stmt.setObject(si++, (hasBinary != null &&
hasBinary.intValue() == NodeDocument.HAS_BINARY_VAL) ? 1 : 0,
+ Types.SMALLINT);
+ stmt.setObject(si++, (deletedOnce != null && deletedOnce) ? 1
: 0, Types.SMALLINT);
+ stmt.setObject(si++, document.get(MODCOUNT), Types.BIGINT);
+ stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) :
cmodcount, Types.BIGINT);
+ stmt.setObject(si++, data.length(), Types.BIGINT);
+ if (data.length() < tmd.getDataLimitInOctets() /
CHAR2OCTETRATIO) {
+ stmt.setString(si++, data);
+ stmt.setBinaryStream(si++, null, 0);
+ } else {
+ stmt.setString(si++, "\"blob\"");
+ byte[] bytes = asBytes(data);
+ stmt.setBytes(si++, bytes);
+ }
+ stmt.addBatch();
+ }
+ int[] results = stmt.executeBatch();
+ boolean success = true;
+ for (int i = 0; i < documents.size(); i++) {
+ int result = results[i];
+ if (result != 1 && result != Statement.SUCCESS_NO_INFO) {
+ LOG.error("DB insert failed for {}: {}", tmd.getName(),
documents.get(i).getId());
+ success = false;
+ }
+ }
+ return success;
+ } finally {
+ stmt.close();
+ }
+ }
+
+ @Nonnull
+ public List<RDBRow> query(Connection connection, RDBTableMetaData tmd,
String minId, String maxId, String indexedProperty,
+ long startValue, int limit) throws SQLException {
+ long start = System.currentTimeMillis();
+ StringBuilder selectClause = new StringBuilder();
+ StringBuilder whereClause = new StringBuilder();
+ if (limit != Integer.MAX_VALUE && this.dbInfo.getFetchFirstSyntax() ==
FETCHFIRSTSYNTAX.TOP) {
+ selectClause.append("TOP " + limit + " ");
+ }
+ selectClause.append("ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY,
DELETEDONCE, DATA, BDATA from ").append(tmd.getName());
+
+ // dynamically build where clause
+ String whereSep = "";
+ if (minId != null) {
+ whereClause.append("ID > ?");
+ whereSep = " and ";
+ }
+ if (maxId != null) {
+ whereClause.append(whereSep).append("ID < ?");
+ whereSep = " and ";
+ }
+
+ if (indexedProperty != null) {
+ if (MODIFIED.equals(indexedProperty)) {
+ whereClause.append(whereSep).append("MODIFIED >= ?");
+ } else if (NodeDocument.HAS_BINARY_FLAG.equals(indexedProperty)) {
+ if (startValue != NodeDocument.HAS_BINARY_VAL) {
+ throw new DocumentStoreException("unsupported value for
property " + NodeDocument.HAS_BINARY_FLAG);
+ }
+ whereClause.append(whereSep).append("HASBINARY = 1");
+ } else if (NodeDocument.DELETED_ONCE.equals(indexedProperty)) {
+ if (startValue != 1) {
+ throw new DocumentStoreException("unsupported value for
property " + NodeDocument.DELETED_ONCE);
+ }
+ whereClause.append(whereSep).append("DELETEDONCE = 1");
+ } else {
+ throw new DocumentStoreException("unsupported indexed
property: " + indexedProperty);
+ }
+ }
+
+ StringBuilder query = new StringBuilder();
+ query.append("select ").append(selectClause);
+ if (whereClause.length() != 0) {
+ query.append(" where ").append(whereClause);
+ }
+
+ query.append(" order by ID");
+
+ if (limit != Integer.MAX_VALUE) {
+ switch (this.dbInfo.getFetchFirstSyntax()) {
+ case LIMIT:
+ query.append(" LIMIT " + limit);
+ break;
+ case FETCHFIRST:
+ query.append(" FETCH FIRST " + limit + " ROWS ONLY");
+ break;
+ default:
+ break;
+ }
+ }
+
+ PreparedStatement stmt = connection.prepareStatement(query.toString());
+ List<RDBRow> result = new ArrayList<RDBRow>();
+ long dataTotal = 0, bdataTotal = 0;
+ try {
+ int si = 1;
+ if (minId != null) {
+ setIdInStatement(tmd, stmt, si++, minId);
+ }
+ if (maxId != null) {
+ setIdInStatement(tmd, stmt, si++, maxId);
+ }
+
+ if (MODIFIED.equals(indexedProperty)) {
+ stmt.setLong(si++, startValue);
+ }
+ if (limit != Integer.MAX_VALUE) {
+ stmt.setFetchSize(limit);
+ }
+ ResultSet rs = stmt.executeQuery();
+ while (rs.next() && result.size() < limit) {
+ String id = getIdFromRS(tmd, rs, 1);
+
+ if ((minId != null && id.compareTo(minId) < 0) || (maxId !=
null && id.compareTo(maxId) > 0)) {
+ throw new DocumentStoreException(
+ "unexpected query result: '" + minId + "' < '" +
id + "' < '" + maxId + "' - broken DB collation?");
+ }
+ long modified = rs.getLong(2);
+ long modcount = rs.getLong(3);
+ long cmodcount = rs.getLong(4);
+ long hasBinary = rs.getLong(5);
+ long deletedOnce = rs.getLong(6);
+ String data = rs.getString(7);
+ byte[] bdata = rs.getBytes(8);
+ result.add(new RDBRow(id, hasBinary == 1, deletedOnce == 1,
modified, modcount, cmodcount, data, bdata));
+ dataTotal += data.length();
+ bdataTotal += bdata == null ? 0 : bdata.length;
+ }
+ } finally {
+ stmt.close();
+ }
+
+ long elapsed = System.currentTimeMillis() - start;
+ if (this.queryHitsLimit != 0 && result.size() > this.queryHitsLimit) {
+ String message = String.format(
+ "Potentially excessive query with %d hits (limited to %d,
configured QUERYHITSLIMIT %d), elapsed time %dms, params minid '%s' maxid '%s'
indexedProperty %s startValue %d limit %d. Check calling method.",
+ result.size(), limit, this.queryHitsLimit, elapsed, minId,
maxId, indexedProperty, startValue, limit);
+ LOG.info(message, new Exception("call stack"));
+ } else if (this.queryTimeLimit != 0 && elapsed > this.queryTimeLimit) {
+ String message = String.format(
+ "Long running query with %d hits (limited to %d), elapsed
time %dms (configured QUERYTIMELIMIT %d), params minid '%s' maxid '%s'
indexedProperty %s startValue %d limit %d. Read %d chars from DATA and %d bytes
from BDATA. Check calling method.",
+ result.size(), limit, elapsed, this.queryTimeLimit, minId,
maxId, indexedProperty, startValue, limit, dataTotal,
+ bdataTotal);
+ LOG.info(message, new Exception("call stack"));
+ }
+
+ return result;
+ }
+
+ @CheckForNull
+ public RDBRow read(Connection connection, RDBTableMetaData tmd, String id,
long lastmodcount) throws SQLException {
+ PreparedStatement stmt;
+
+ boolean useCaseStatement = lastmodcount != -1 &&
this.dbInfo.allowsCaseInSelect();
+ if (useCaseStatement) {
+ // the case statement causes the actual row data not to be
+ // sent in case we already have it
+ stmt = connection.prepareStatement(
+ "select MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY,
DELETEDONCE, case MODCOUNT when ? then null else DATA end as DATA, "
+ + "case MODCOUNT when ? then null else BDATA end
as BDATA from " + tmd.getName() + " where ID = ?");
+ } else {
+ // either we don't have a previous version of the document
+ // or the database does not support CASE in SELECT
+ stmt = connection.prepareStatement("select MODIFIED, MODCOUNT,
CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA from "
+ + tmd.getName() + " where ID = ?");
+ }
+
+ try {
+ int si = 1;
+ if (useCaseStatement) {
+ stmt.setLong(si++, lastmodcount);
+ stmt.setLong(si++, lastmodcount);
+ }
+ setIdInStatement(tmd, stmt, si, id);
+
+ ResultSet rs = stmt.executeQuery();
+ if (rs.next()) {
+ long modified = rs.getLong(1);
+ long modcount = rs.getLong(2);
+ long cmodcount = rs.getLong(3);
+ long hasBinary = rs.getLong(4);
+ long deletedOnce = rs.getLong(5);
+ String data = rs.getString(6);
+ byte[] bdata = rs.getBytes(7);
+ return new RDBRow(id, hasBinary == 1, deletedOnce == 1,
modified, modcount, cmodcount, data, bdata);
+ } else {
+ return null;
+ }
+ } catch (SQLException ex) {
+ LOG.error("attempting to read " + id + " (id length is " +
id.length() + ")", ex);
+ // DB2 throws an SQLException for invalid keys; handle this more
+ // gracefully
+ if ("22001".equals(ex.getSQLState())) {
+ try {
+ connection.rollback();
+ } catch (SQLException ex2) {
+ LOG.debug("failed to rollback", ex2);
+ }
+ return null;
+ } else {
+ throw (ex);
+ }
+ } finally {
+ stmt.close();
+ }
+ }
+
+ public boolean update(Connection connection, RDBTableMetaData tmd, String
id, Long modified, Boolean hasBinary,
+ Boolean deletedOnce, Long modcount, Long cmodcount, Long
oldmodcount, String data) throws SQLException {
+ String t = "update " + tmd.getName()
+ + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT
= ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, BDATA = ? where ID = ?";
+ if (oldmodcount != null) {
+ t += " and MODCOUNT = ?";
+ }
+ PreparedStatement stmt = connection.prepareStatement(t);
+ try {
+ int si = 1;
+ stmt.setObject(si++, modified, Types.BIGINT);
+ stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, deletedOnce ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, modcount, Types.BIGINT);
+ stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) :
cmodcount, Types.BIGINT);
+ stmt.setObject(si++, data.length(), Types.BIGINT);
+
+ if (data.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) {
+ stmt.setString(si++, data);
+ stmt.setBinaryStream(si++, null, 0);
+ } else {
+ stmt.setString(si++, "\"blob\"");
+ byte[] bytes = asBytes(data);
+ stmt.setBytes(si++, bytes);
+ }
+
+ setIdInStatement(tmd, stmt, si++, id);
+
+ if (oldmodcount != null) {
+ stmt.setObject(si++, oldmodcount, Types.BIGINT);
+ }
+ int result = stmt.executeUpdate();
+ if (result != 1) {
+ LOG.debug("DB update failed for " + tmd.getName() + "/" + id +
" with oldmodcount=" + oldmodcount);
+ }
+ return result == 1;
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private static String getIdFromRS(RDBTableMetaData tmd, ResultSet rs, int
idx) throws SQLException {
+ if (tmd.isIdBinary()) {
+ try {
+ return new String(rs.getBytes(idx), "UTF-8");
+ } catch (UnsupportedEncodingException ex) {
+ LOG.error("UTF-8 not supported??", ex);
+ throw new DocumentStoreException(ex);
+ }
+ } else {
+ return rs.getString(idx);
+ }
+ }
+
+ private static void setIdInStatement(RDBTableMetaData tmd,
PreparedStatement stmt, int idx, String id) throws SQLException {
+ if (tmd.isIdBinary()) {
+ try {
+ stmt.setBytes(idx, id.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException ex) {
+ LOG.error("UTF-8 not supported??", ex);
+ throw new DocumentStoreException(ex);
+ }
+ } else {
+ stmt.setString(idx, id);
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
------------------------------------------------------------------------------
svn:eol-style = native