Author: reschke
Date: Fri Mar 28 15:03:59 2014
New Revision: 1582772
URL: http://svn.apache.org/r1582772
Log:
OAK-1533 - make the iterator chunked
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java?rev=1582772&r1=1582771&r2=1582772&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
Fri Mar 28 15:03:59 2014
@@ -25,6 +25,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import javax.sql.DataSource;
@@ -353,38 +354,82 @@ public class RDBBlobStore extends Cachin
@Override
public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws
Exception {
- PreparedStatement prep = null;
+ return new ChunkIdIterator(this.connection, maxLastModifiedTime);
+ }
+
- if (maxLastModifiedTime > 0) {
- prep = connection.prepareStatement(
- "select id from datastore_meta where lastMod <= ?");
- prep.setLong(1, maxLastModifiedTime);
- } else {
- prep = connection.prepareStatement(
- "select id from datastore_meta");
+ /**
+ * Reads chunk IDs in batches.
+ */
+ private static class ChunkIdIterator extends AbstractIterator<String> {
+
+ private long maxLastModifiedTime;
+ private Connection connection;
+ private static int BATCHSIZE = 1024 * 256;
+ private List<String> results = new LinkedList<String>();
+ private String lastId = null;
+
+ public ChunkIdIterator(Connection connection, long
maxLastModifiedTime) {
+ this.maxLastModifiedTime = maxLastModifiedTime;
+ this.connection = connection;
+ }
+
+ @Override
+ protected String computeNext() {
+ if (!results.isEmpty()) {
+ return results.remove(0);
+ } else {
+ // need to refill
+ if (refill()) {
+ return computeNext();
+ } else {
+ return endOfData();
+ }
+ }
}
- final ResultSet rs = prep.executeQuery();
+ private boolean refill() {
+ StringBuffer query = new StringBuffer();
+ query.append("select id from datastore_meta");
+ if (maxLastModifiedTime > 0) {
+ query.append(" where lastMod <= ?");
+ if (lastId != null) {
+ query.append(" and id > ?");
+ }
+ } else {
+ if (lastId != null) {
+ query.append(" where id > ?");
+ }
+ }
+ query.append(" order by id limit " + BATCHSIZE);
- return new AbstractIterator<String>() {
- protected String computeNext() {
+ try {
try {
- if (rs.next()) {
- return rs.getString(1);
- } else {
- rs.close();
+ PreparedStatement prep =
connection.prepareStatement(query.toString());
+ int idx = 1;
+ if (maxLastModifiedTime > 0) {
+ prep.setLong(idx++, maxLastModifiedTime);
}
- } catch (SQLException e) {
- try {
- if ((rs != null) && !rs.isClosed()) {
- rs.close();
- }
- } catch (Exception e2) {
+ if (lastId != null) {
+ prep.setString(idx++, lastId);
+ }
+
+ ResultSet rs = prep.executeQuery();
+ while (rs.next()) {
+ lastId = rs.getString(1);
+ results.add(lastId);
}
- throw new RuntimeException(e);
+ return !results.isEmpty();
+ } finally {
+ connection.commit();
+ }
+ } catch (SQLException ex) {
+ try {
+ connection.rollback();
+ } catch (SQLException e) {
}
- return endOfData();
+ return false;
}
- };
+ }
}
}