Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1827987&r1=1827986&r2=1827987&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Thu Mar 29 13:06:10 2018 @@ -46,8 +46,11 @@ import com.google.common.collect.Iterabl import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.UncheckedExecutionException; +import com.mongodb.Block; +import com.mongodb.DBObject; +import com.mongodb.MongoBulkWriteException; +import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; -import com.mongodb.QueryOperators; import com.mongodb.ReadPreference; import org.apache.jackrabbit.oak.cache.CacheStats; @@ -79,26 +82,30 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.stats.Clock; import org.apache.jackrabbit.oak.commons.PerfLogger; +import org.bson.conversions.Bson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.collect.Maps; import com.mongodb.BasicDBObject; -import com.mongodb.BulkWriteError; -import com.mongodb.BulkWriteException; -import com.mongodb.BulkWriteOperation; -import com.mongodb.BulkWriteResult; -import com.mongodb.BulkWriteUpsert; -import com.mongodb.CommandResult; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; import com.mongodb.MongoException; -import com.mongodb.QueryBuilder; import com.mongodb.WriteConcern; -import com.mongodb.WriteResult; +import com.mongodb.bulk.BulkWriteError; +import com.mongodb.bulk.BulkWriteResult; +import com.mongodb.bulk.BulkWriteUpsert; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.FindOneAndUpdateOptions; +import com.mongodb.client.model.ReturnDocument; +import com.mongodb.client.model.UpdateOneModel; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.WriteModel; +import com.mongodb.client.result.UpdateResult; import static com.google.common.base.Predicates.in; import static com.google.common.base.Predicates.not; @@ -126,7 +133,7 @@ public class MongoDocumentStore implemen LoggerFactory.getLogger(MongoDocumentStore.class.getName() + ".perf")); - private static final DBObject BY_ID_ASC = new BasicDBObject(Document.ID, 1); + private static final Bson BY_ID_ASC = new BasicDBObject(Document.ID, 1); enum DocumentReadPreference { PRIMARY, @@ -137,12 +144,13 @@ public class MongoDocumentStore implemen public static final int IN_CLAUSE_BATCH_SIZE = 500; - private final DBCollection nodes; - private final DBCollection clusterNodes; - private final DBCollection settings; - private final DBCollection journal; + private MongoCollection<BasicDBObject> nodes; + private final MongoCollection<BasicDBObject> clusterNodes; + private final MongoCollection<BasicDBObject> settings; + private final MongoCollection<BasicDBObject> journal; - private final DB db; + private final MongoClient client; + private final MongoDatabase db; private final NodeDocumentCache nodesCache; @@ -240,11 +248,12 @@ public class MongoDocumentStore implemen private final boolean readOnly; - public MongoDocumentStore(DB db, MongoDocumentNodeStoreBuilderBase<?> builder) { + public MongoDocumentStore(MongoClient client, String dbName, + MongoDocumentNodeStoreBuilderBase<?> builder) { this.readOnly = builder.getReadOnlyMode(); MongoStatus mongoStatus = builder.getMongoStatus(); if (mongoStatus == null) { - mongoStatus = new MongoStatus(db); + mongoStatus = new MongoStatus(client, dbName); } mongoStatus.checkVersion(); metadata = ImmutableMap.<String,String>builder() @@ -252,12 +261,13 @@ public class MongoDocumentStore implemen .put("version", mongoStatus.getVersion()) .build(); - this.db = db; + this.client = client; + this.db = client.getDatabase(dbName); stats = builder.getDocumentStoreStatsCollector(); - nodes = db.getCollection(Collection.NODES.toString()); - clusterNodes = db.getCollection(Collection.CLUSTER_NODES.toString()); - settings = db.getCollection(Collection.SETTINGS.toString()); - journal = db.getCollection(Collection.JOURNAL.toString()); + nodes = db.getCollection(Collection.NODES.toString(), BasicDBObject.class); + clusterNodes = db.getCollection(Collection.CLUSTER_NODES.toString(), BasicDBObject.class); + settings = db.getCollection(Collection.SETTINGS.toString(), BasicDBObject.class); + journal = db.getCollection(Collection.JOURNAL.toString(), BasicDBObject.class); maxReplicationLagMillis = builder.getMaxReplicationLagMillis(); @@ -265,7 +275,7 @@ public class MongoDocumentStore implemen replicaInfo = null; localChanges = null; } else { - replicaInfo = new ReplicaSetInfo(clock, db, builder.getMongoUri(), estimationPullFrequencyMS, maxReplicationLagMillis, builder.getExecutor()); + replicaInfo = new ReplicaSetInfo(clock, client, dbName, builder.getMongoUri(), estimationPullFrequencyMS, maxReplicationLagMillis, builder.getExecutor()); Thread replicaInfoThread = new Thread(replicaInfo, "MongoDocumentStore replica set info provider"); replicaInfoThread.setDaemon(true); replicaInfoThread.start(); @@ -541,7 +551,7 @@ public class MongoDocumentStore implemen @CheckForNull protected <T extends Document> T findUncached(Collection<T> collection, String key, DocumentReadPreference docReadPref) { log("findUncached", key, docReadPref); - DBCollection dbCollection = getDBCollection(collection); + MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); final Stopwatch watch = startWatch(); boolean isSlaveOk = false; boolean docFound = true; @@ -553,13 +563,14 @@ public class MongoDocumentStore implemen isSlaveOk = true; } - DBObject obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, readPreference); + List<BasicDBObject> result = new ArrayList<>(1); + dbCollection.withReadPreference(readPreference).find(getByKeyQuery(key)).into(result); - if(obj == null){ + if(result.isEmpty()) { docFound = false; return null; } - T doc = convertFromDBObject(collection, obj); + T doc = convertFromDBObject(collection, result.get(0)); if (doc != null) { doc.seal(); } @@ -633,12 +644,13 @@ public class MongoDocumentStore implemen int limit, long maxQueryTime) { log("query", fromKey, toKey, indexedProperty, startValue, limit); - DBCollection dbCollection = getDBCollection(collection); - QueryBuilder queryBuilder = QueryBuilder.start(Document.ID); - queryBuilder.greaterThan(fromKey); - queryBuilder.lessThan(toKey); + MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); + + List<Bson> clauses = new ArrayList<>(); + clauses.add(Filters.gt(Document.ID, fromKey)); + clauses.add(Filters.lt(Document.ID, toKey)); - DBObject hint = new BasicDBObject(NodeDocument.ID, 1); + Bson hint = new BasicDBObject(NodeDocument.ID, 1); if (indexedProperty != null) { if (NodeDocument.DELETED_ONCE.equals(indexedProperty)) { @@ -647,11 +659,9 @@ public class MongoDocumentStore implemen "unsupported value for property " + NodeDocument.DELETED_ONCE); } - queryBuilder.and(indexedProperty); - queryBuilder.is(true); + clauses.add(Filters.eq(indexedProperty, true)); } else { - queryBuilder.and(indexedProperty); - queryBuilder.greaterThanEquals(startValue); + clauses.add(Filters.gte(indexedProperty, startValue)); if (NodeDocument.MODIFIED_IN_SECS.equals(indexedProperty) && canUseModifiedTimeIdx(startValue)) { @@ -659,7 +669,7 @@ public class MongoDocumentStore implemen } } } - DBObject query = queryBuilder.get(); + Bson query = Filters.and(clauses); String parentId = Utils.getParentIdFromLowerLimit(fromKey); long lockTime = -1; final Stopwatch watch = startWatch(); @@ -671,17 +681,6 @@ public class MongoDocumentStore implemen cacheChangesTracker = nodesCache.registerTracker(fromKey, toKey); } try { - DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC); - if (limit >= 0) { - cursor.limit(limit); - } - if (!disableIndexHint && !hasModifiedIdCompoundIndex) { - cursor.hint(hint); - } - if (maxQueryTime > 0) { - // OAK-2614: set maxTime if maxQueryTimeMS > 0 - cursor.maxTime(maxQueryTime, TimeUnit.MILLISECONDS); - } ReadPreference readPreference = getMongoReadPreference(collection, parentId, null, getDefaultReadPreference(collection)); @@ -689,20 +688,28 @@ public class MongoDocumentStore implemen isSlaveOk = true; LOG.trace("Routing call to secondary for fetching children from [{}] to [{}]", fromKey, toKey); } - - cursor.setReadPreference(readPreference); + FindIterable<BasicDBObject> result = dbCollection + .withReadPreference(readPreference).find(query).sort(BY_ID_ASC); + if (limit >= 0) { + result.limit(limit); + } + if (!disableIndexHint && !hasModifiedIdCompoundIndex) { + result.modifiers(new BasicDBObject("$hint", hint)); + } + if (maxQueryTime > 0) { + // OAK-2614: set maxTime if maxQueryTimeMS > 0 + result.maxTime(maxQueryTime, TimeUnit.MILLISECONDS); + } List<T> list; - try { + try (MongoCursor<BasicDBObject> cursor = result.iterator()) { list = new ArrayList<T>(); for (int i = 0; i < limit && cursor.hasNext(); i++) { - DBObject o = cursor.next(); + BasicDBObject o = cursor.next(); T doc = convertFromDBObject(collection, o); list.add(doc); } resultSize = list.size(); - } finally { - cursor.close(); } if (cacheChangesTracker != null) { @@ -729,10 +736,10 @@ public class MongoDocumentStore implemen @Override public <T extends Document> void remove(Collection<T> collection, String key) { log("remove", key); - DBCollection dbCollection = getDBCollection(collection); + MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); Stopwatch watch = startWatch(); try { - dbCollection.remove(getByKeyQuery(key).get()); + dbCollection.deleteOne(getByKeyQuery(key)); } catch (Exception e) { throw DocumentStoreException.convert(e, "Remove failed for " + key); } finally { @@ -744,13 +751,13 @@ public class MongoDocumentStore implemen @Override public <T extends Document> void remove(Collection<T> collection, List<String> keys) { log("remove", keys); - DBCollection dbCollection = getDBCollection(collection); + MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); Stopwatch watch = startWatch(); try { for(List<String> keyBatch : Lists.partition(keys, IN_CLAUSE_BATCH_SIZE)){ - DBObject query = QueryBuilder.start(Document.ID).in(keyBatch).get(); + Bson query = Filters.in(Document.ID, keyBatch); try { - dbCollection.remove(query); + dbCollection.deleteMany(query); } catch (Exception e) { throw DocumentStoreException.convert(e, "Remove failed for " + keyBatch); } finally { @@ -770,24 +777,23 @@ public class MongoDocumentStore implemen public <T extends Document> int remove(Collection<T> collection, Map<String, Long> toRemove) { log("remove", toRemove); int num = 0; - DBCollection dbCollection = getDBCollection(collection); + MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); Stopwatch watch = startWatch(); try { List<String> batchIds = Lists.newArrayList(); - List<DBObject> batch = Lists.newArrayList(); + List<Bson> batch = Lists.newArrayList(); Iterator<Entry<String, Long>> it = toRemove.entrySet().iterator(); while (it.hasNext()) { Entry<String, Long> entry = it.next(); Condition c = newEqualsCondition(entry.getValue()); - QueryBuilder query = createQueryForUpdate( - entry.getKey(), Collections.singletonMap(KEY_MODIFIED, c)); + Bson clause = createQueryForUpdate(entry.getKey(), + Collections.singletonMap(KEY_MODIFIED, c)); batchIds.add(entry.getKey()); - batch.add(query.get()); + batch.add(clause); if (!it.hasNext() || batch.size() == IN_CLAUSE_BATCH_SIZE) { - DBObject q = new BasicDBObject(); - q.put(QueryOperators.OR, batch); + Bson query = Filters.or(batch); try { - num += dbCollection.remove(q).getN(); + num += dbCollection.deleteMany(query).getDeletedCount(); } catch (Exception e) { throw DocumentStoreException.convert(e, "Remove failed for " + batch); } finally { @@ -811,14 +817,15 @@ public class MongoDocumentStore implemen throws DocumentStoreException { log("remove", collection, indexedProperty, startValue, endValue); int num = 0; - DBCollection dbCollection = getDBCollection(collection); + MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); Stopwatch watch = startWatch(); try { - QueryBuilder queryBuilder = QueryBuilder.start(indexedProperty); - queryBuilder.greaterThan(startValue); - queryBuilder.lessThan(endValue); + Bson query = Filters.and( + Filters.gt(indexedProperty, startValue), + Filters.lt(indexedProperty, endValue) + ); try { - num = dbCollection.remove(queryBuilder.get()).getN(); + num = (int) Math.min(dbCollection.deleteMany(query).getDeletedCount(), Integer.MAX_VALUE); } catch (Exception e) { throw DocumentStoreException.convert(e, "Remove failed for " + collection + ": " + indexedProperty + " in (" + startValue + ", " + endValue + ")"); @@ -844,10 +851,10 @@ public class MongoDocumentStore implemen UpdateOp updateOp, boolean upsert, boolean checkConditions) { - DBCollection dbCollection = getDBCollection(collection); + MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); // make sure we don't modify the original updateOp updateOp = updateOp.copy(); - DBObject update = createUpdate(updateOp, false); + Bson update = createUpdate(updateOp, false); Lock lock = null; if (collection == Collection.NODES) { @@ -873,17 +880,17 @@ public class MongoDocumentStore implemen // no conditions and the check is OK. this avoid an // unnecessary call when the conditions do not match if (!checkConditions || UpdateUtils.checkConditions(cachedDoc, updateOp.getConditions())) { - QueryBuilder query = createQueryForUpdate(updateOp.getId(), + Bson query = createQueryForUpdate(updateOp.getId(), updateOp.getConditions()); // below condition may overwrite a user supplied condition - // on _modCount. This fine, because the conditions were + // on _modCount. This is fine, because the conditions were // already checked against the cached document with the // matching _modCount value. There is no need to check the // user supplied condition on _modCount again on the server - query.and(Document.MOD_COUNT).is(modCount); + query = Filters.and(query, Filters.eq(Document.MOD_COUNT, modCount)); - WriteResult result = dbCollection.update(query.get(), update); - if (result.getN() > 0) { + UpdateResult result = dbCollection.updateOne(query, update); + if (result.getModifiedCount() > 0) { // success, update cached document if (collection == Collection.NODES) { NodeDocument newDoc = (NodeDocument) applyChanges(collection, cachedDoc, updateOp); @@ -897,8 +904,10 @@ public class MongoDocumentStore implemen // conditional update failed or not possible // perform operation and get complete document - QueryBuilder query = createQueryForUpdate(updateOp.getId(), updateOp.getConditions()); - DBObject oldNode = dbCollection.findAndModify(query.get(), null, null /*sort*/, false /*remove*/, update, false /*returnNew*/, upsert); + Bson query = createQueryForUpdate(updateOp.getId(), updateOp.getConditions()); + FindOneAndUpdateOptions options = new FindOneAndUpdateOptions() + .returnDocument(ReturnDocument.BEFORE).upsert(upsert); + BasicDBObject oldNode = dbCollection.findOneAndUpdate(query, update, options); if (oldNode == null){ newEntry = true; @@ -1130,17 +1139,15 @@ public class MongoDocumentStore implemen private <T extends Document> Map<String, T> findDocuments(Collection<T> collection, Set<String> keys) { Map<String, T> docs = new HashMap<String, T>(); if (!keys.isEmpty()) { - DBObject[] conditions = new DBObject[keys.size()]; - int i = 0; + List<Bson> conditions = new ArrayList<>(keys.size()); for (String key : keys) { - conditions[i++] = getByKeyQuery(key).get(); + conditions.add(getByKeyQuery(key)); } - QueryBuilder builder = new QueryBuilder(); - builder.or(conditions); - DBCursor cursor = getDBCollection(collection).find(builder.get()); - while (cursor.hasNext()) { - T foundDoc = convertFromDBObject(collection, cursor.next()); + FindIterable<BasicDBObject> cursor = getDBCollection(collection) + .find(Filters.or(conditions)); + for (BasicDBObject doc : cursor) { + T foundDoc = convertFromDBObject(collection, doc); docs.put(foundDoc.getId(), foundDoc); } } @@ -1149,23 +1156,21 @@ public class MongoDocumentStore implemen private <T extends Document> BulkUpdateResult sendBulkUpdate(Collection<T> collection, java.util.Collection<UpdateOp> updateOps, Map<String, T> oldDocs) { - DBCollection dbCollection = getDBCollection(collection); - BulkWriteOperation bulk = dbCollection.initializeUnorderedBulkOperation(); + MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); + List<WriteModel<BasicDBObject>> writes = new ArrayList<>(updateOps.size()); String[] bulkIds = new String[updateOps.size()]; int i = 0; for (UpdateOp updateOp : updateOps) { String id = updateOp.getId(); - QueryBuilder query = createQueryForUpdate(id, updateOp.getConditions()); + Bson query = createQueryForUpdate(id, updateOp.getConditions()); T oldDoc = oldDocs.get(id); - DBObject update; if (oldDoc == null || oldDoc == NodeDocument.NULL) { - query.and(Document.MOD_COUNT).exists(false); - update = createUpdate(updateOp, true); + query = Filters.and(query, Filters.exists(Document.MOD_COUNT, false)); + writes.add(new UpdateOneModel<>(query, createUpdate(updateOp, true), new UpdateOptions().upsert(true))); } else { - query.and(Document.MOD_COUNT).is(oldDoc.getModCount()); - update = createUpdate(updateOp, false); + query = Filters.and(query, Filters.eq(Document.MOD_COUNT, oldDoc.getModCount())); + writes.add(new UpdateOneModel<>(query, createUpdate(updateOp, false), new UpdateOptions().upsert(true))); } - bulk.find(query.get()).upsert().updateOne(update); bulkIds[i++] = id; } @@ -1173,8 +1178,9 @@ public class MongoDocumentStore implemen Set<String> failedUpdates = new HashSet<String>(); Set<String> upserts = new HashSet<String>(); try { - bulkResult = bulk.execute(); - } catch (BulkWriteException e) { + bulkResult = dbCollection.bulkWrite(writes, + new BulkWriteOptions().ordered(false)); + } catch (MongoBulkWriteException e) { bulkResult = e.getWriteResult(); for (BulkWriteError err : e.getWriteErrors()) { failedUpdates.add(bulkIds[err.getIndex()]); @@ -1199,18 +1205,18 @@ public class MongoDocumentStore implemen public <T extends Document> boolean create(Collection<T> collection, List<UpdateOp> updateOps) { log("create", updateOps); List<T> docs = new ArrayList<T>(); - DBObject[] inserts = new DBObject[updateOps.size()]; - List<String> ids = Lists.newArrayListWithCapacity(updateOps.size()); + List<BasicDBObject> inserts = new ArrayList<>(updateOps.size()); + List<String> ids = new ArrayList<>(updateOps.size()); - for (int i = 0; i < updateOps.size(); i++) { - inserts[i] = new BasicDBObject(); - UpdateOp update = updateOps.get(i); - inserts[i].put(Document.ID, update.getId()); + for (UpdateOp update : updateOps) { + BasicDBObject doc = new BasicDBObject(); + inserts.add(doc); + doc.put(Document.ID, update.getId()); UpdateUtils.assertUnconditional(update); T target = collection.newDocument(this); UpdateUtils.applyChanges(target, update); docs.add(target); - ids.add(updateOps.get(i).getId()); + ids.add(update.getId()); for (Entry<Key, Operation> entry : update.getChanges().entrySet()) { Key k = entry.getKey(); Operation op = entry.getValue(); @@ -1218,28 +1224,20 @@ public class MongoDocumentStore implemen case SET: case MAX: case INCREMENT: { - inserts[i].put(k.toString(), op.value); + doc.put(k.toString(), op.value); break; } case SET_MAP_ENTRY: { Revision r = k.getRevision(); if (r == null) { - throw new IllegalStateException( - "SET_MAP_ENTRY must not have null revision"); + throw new IllegalStateException("SET_MAP_ENTRY must not have null revision"); } - DBObject value = (DBObject) inserts[i].get(k.getName()); + BasicDBObject value = (BasicDBObject) doc.get(k.getName()); if (value == null) { - value = new RevisionEntry(r, op.value); - inserts[i].put(k.getName(), value); - } else if (value.keySet().size() == 1) { - String key = value.keySet().iterator().next(); - Object val = value.get(key); - value = new BasicDBObject(key, val); - value.put(r.toString(), op.value); - inserts[i].put(k.getName(), value); - } else { - value.put(r.toString(), op.value); + value = new BasicDBObject(); + doc.put(k.getName(), value); } + value.put(r.toString(), op.value); break; } case REMOVE: @@ -1248,18 +1246,18 @@ public class MongoDocumentStore implemen break; } } - if (!inserts[i].containsField(Document.MOD_COUNT)) { - inserts[i].put(Document.MOD_COUNT, 1L); + if (!doc.containsKey(Document.MOD_COUNT)) { + doc.put(Document.MOD_COUNT, 1L); target.put(Document.MOD_COUNT, 1L); } } - DBCollection dbCollection = getDBCollection(collection); + MongoCollection<BasicDBObject> dbCollection = getDBCollection(collection); final Stopwatch watch = startWatch(); boolean insertSuccess = false; try { try { - dbCollection.insert(inserts); + dbCollection.insertMany(inserts); if (collection == Collection.NODES) { for (T doc : docs) { nodesCache.putIfAbsent((NodeDocument) doc); @@ -1290,28 +1288,27 @@ public class MongoDocumentStore implemen @Nonnull private Map<String, ModificationStamp> getModStamps(Iterable<String> keys) throws MongoException { - QueryBuilder query = QueryBuilder.start(Document.ID).in(keys); // Fetch only the modCount and id final BasicDBObject fields = new BasicDBObject(Document.ID, 1); fields.put(Document.MOD_COUNT, 1); fields.put(NodeDocument.MODIFIED_IN_SECS, 1); - DBCursor cursor = nodes.find(query.get(), fields); - cursor.setReadPreference(ReadPreference.primary()); - Map<String, ModificationStamp> modCounts = Maps.newHashMap(); - for (DBObject obj : cursor) { - String id = (String) obj.get(Document.ID); - Long modCount = Utils.asLong((Number) obj.get(Document.MOD_COUNT)); - if (modCount == null) { - modCount = -1L; - } - Long modified = Utils.asLong((Number) obj.get(NodeDocument.MODIFIED_IN_SECS)); - if (modified == null) { - modified = -1L; - } - modCounts.put(id, new ModificationStamp(modCount, modified)); - } + + nodes.withReadPreference(ReadPreference.primary()) + .find(Filters.in(Document.ID, keys)).projection(fields) + .forEach((Block<BasicDBObject>) obj -> { + String id = (String) obj.get(Document.ID); + Long modCount = Utils.asLong((Number) obj.get(Document.MOD_COUNT)); + if (modCount == null) { + modCount = -1L; + } + Long modified = Utils.asLong((Number) obj.get(NodeDocument.MODIFIED_IN_SECS)); + if (modified == null) { + modified = -1L; + } + modCounts.put(id, new ModificationStamp(modCount, modified)); + }); return modCounts; } @@ -1432,7 +1429,7 @@ public class MongoDocumentStore implemen return map; } - <T extends Document> DBCollection getDBCollection(Collection<T> collection) { + <T extends Document> MongoCollection<BasicDBObject> getDBCollection(Collection<T> collection) { if (collection == Collection.NODES) { return nodes; } else if (collection == Collection.CLUSTER_NODES) { @@ -1447,8 +1444,16 @@ public class MongoDocumentStore implemen } } - private static QueryBuilder getByKeyQuery(String key) { - return QueryBuilder.start(Document.ID).is(key); + MongoDatabase getDatabase() { + return db; + } + + MongoClient getClient() { + return client; + } + + private static Bson getByKeyQuery(String key) { + return Filters.eq(Document.ID, key); } @Override @@ -1456,7 +1461,7 @@ public class MongoDocumentStore implemen if (replicaInfo != null) { replicaInfo.stop(); } - nodes.getDB().getMongo().close(); + client.close(); try { nodesCache.close(); } catch (IOException e) { @@ -1478,8 +1483,12 @@ public class MongoDocumentStore implemen @Override public Map<String, String> getStats() { ImmutableMap.Builder<String, String> builder = ImmutableMap.builder(); - List<DBCollection> all = ImmutableList.of(nodes, clusterNodes, settings, journal); - all.forEach(c -> toMapBuilder(builder, c.getStats(), c.getName())); + List<MongoCollection<?>> all = ImmutableList.of(nodes, clusterNodes, settings, journal); + all.forEach(c -> toMapBuilder(builder, + db.runCommand( + new BasicDBObject("collStats", c.getNamespace().getCollectionName()), + BasicDBObject.class), + c.getNamespace().getCollectionName())); return builder.build(); } @@ -1515,27 +1524,31 @@ public class MongoDocumentStore implemen } @Nonnull - private static QueryBuilder createQueryForUpdate(String key, - Map<Key, Condition> conditions) { - QueryBuilder query = getByKeyQuery(key); - + private static Bson createQueryForUpdate(String key, + Map<Key, Condition> conditions) { + Bson query = getByKeyQuery(key); + if (conditions.isEmpty()) { + // special case when there are no conditions + return query; + } + List<Bson> conditionList = new ArrayList<>(conditions.size() + 1); + conditionList.add(query); for (Entry<Key, Condition> entry : conditions.entrySet()) { Key k = entry.getKey(); Condition c = entry.getValue(); switch (c.type) { case EXISTS: - query.and(k.toString()).exists(c.value); + conditionList.add(Filters.exists(k.toString(), Boolean.TRUE.equals(c.value))); break; case EQUALS: - query.and(k.toString()).is(c.value); + conditionList.add(Filters.eq(k.toString(), c.value)); break; case NOTEQUALS: - query.and(k.toString()).notEquals(c.value); + conditionList.add(Filters.ne(k.toString(), c.value)); break; } } - - return query; + return Filters.and(conditionList); } /** @@ -1546,7 +1559,7 @@ public class MongoDocumentStore implemen * @return the DBObject. */ @Nonnull - private static DBObject createUpdate(UpdateOp updateOp, boolean includeId) { + private static BasicDBObject createUpdate(UpdateOp updateOp, boolean includeId) { BasicDBObject setUpdates = new BasicDBObject(); BasicDBObject maxUpdates = new BasicDBObject(); BasicDBObject incUpdates = new BasicDBObject(); @@ -1630,13 +1643,13 @@ public class MongoDocumentStore implemen ReadPreference readPref = uri.getOptions().getReadPreference(); if (!readPref.equals(nodes.getReadPreference())) { - nodes.setReadPreference(readPref); - LOG.info("Using ReadPreference {} ",readPref); + nodes = nodes.withReadPreference(readPref); + LOG.info("Using ReadPreference {} ", readPref); } WriteConcern writeConcern = uri.getOptions().getWriteConcern(); if (!writeConcern.equals(nodes.getWriteConcern())) { - nodes.setWriteConcern(writeConcern); + nodes = nodes.withWriteConcern(writeConcern); LOG.info("Using WriteConcern " + writeConcern); } } catch (Exception e) { @@ -1684,34 +1697,39 @@ public class MongoDocumentStore implemen final long start = System.currentTimeMillis(); // assumption here: server returns UTC - ie the returned // date object is correctly taking care of time zones. - final CommandResult isMaster = db.command("isMaster"); - if (isMaster == null) { - // OAK-4107 / OAK-4515 : extra safety - LOG.warn("determineServerTimeDifferenceMillis: db.isMaster returned null - cannot determine time difference - assuming 0ms."); - return 0; - } - final Date serverLocalTime = isMaster.getDate("localTime"); - if (serverLocalTime == null) { - // OAK-4107 / OAK-4515 : looks like this can happen - at least - // has been seen once on mongo 3.0.9 - // let's handle this gently and issue a log.warn - // instead of throwing a NPE - LOG.warn("determineServerTimeDifferenceMillis: db.isMaster.localTime returned null - cannot determine time difference - assuming 0ms. " - + "(Result details: server exception=" + isMaster.getException() + ", server error message=" + isMaster.getErrorMessage() + ")", - isMaster.getException()); - return 0; - } - final long end = System.currentTimeMillis(); - - final long midPoint = (start + end) / 2; - final long serverLocalTimeMillis = serverLocalTime.getTime(); - - // the difference should be - // * positive when local instance is ahead - // * and negative when the local instance is behind - final long diff = midPoint - serverLocalTimeMillis; + final BasicDBObject isMaster; + try { + isMaster = db.runCommand(new BasicDBObject("isMaster", 1), BasicDBObject.class); + if (isMaster == null) { + // OAK-4107 / OAK-4515 : extra safety + LOG.warn("determineServerTimeDifferenceMillis: db.isMaster returned null - cannot determine time difference - assuming 0ms."); + return 0; + } + final Date serverLocalTime = isMaster.getDate("localTime"); + if (serverLocalTime == null) { + // OAK-4107 / OAK-4515 : looks like this can happen - at least + // has been seen once on mongo 3.0.9 + // let's handle this gently and issue a log.warn + // instead of throwing a NPE + LOG.warn("determineServerTimeDifferenceMillis: db.isMaster.localTime returned null - cannot determine time difference - assuming 0ms."); + return 0; + } + final long end = System.currentTimeMillis(); + + final long midPoint = (start + end) / 2; + final long serverLocalTimeMillis = serverLocalTime.getTime(); + + // the difference should be + // * positive when local instance is ahead + // * and negative when the local instance is behind + final long diff = midPoint - serverLocalTimeMillis; - return diff; + return diff; + } catch (Exception e) { + LOG.warn("determineServerTimeDifferenceMillis: db.isMaster failed with exception - assuming 0ms. " + + "(Result details: server exception=" + e + ", server error message=" + e.getMessage() + ")", e); + } + return 0; } @Override
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreMetrics.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreMetrics.java?rev=1827987&r1=1827986&r2=1827987&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreMetrics.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreMetrics.java Thu Mar 29 13:06:10 2018 @@ -17,11 +17,12 @@ package org.apache.jackrabbit.oak.plugins.document.mongo; import java.util.Set; +import java.util.TreeSet; import com.google.common.collect.ImmutableList; import com.mongodb.BasicDBObject; -import com.mongodb.DB; import com.mongodb.MongoException; +import com.mongodb.client.MongoDatabase; import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.Document; @@ -43,13 +44,13 @@ public final class MongoDocumentStoreMet Collection.NODES, Collection.JOURNAL, Collection.CLUSTER_NODES, Collection.SETTINGS, Collection.BLOBS ); - private final DB db; + private final MongoDatabase db; private final StatisticsProvider statsProvider; public MongoDocumentStoreMetrics(MongoDocumentStore store, StatisticsProvider statsProvider) { - this.db = store.getDBCollection(Collection.NODES).getDB(); + this.db = store.getDatabase(); this.statsProvider = statsProvider; } @@ -65,7 +66,8 @@ public final class MongoDocumentStoreMet private void updateCounters() { LOG.debug("Updating counters"); try { - Set<String> collectionNames = db.getCollectionNames(); + Set<String> collectionNames = new TreeSet<>(); + db.listCollectionNames().into(collectionNames); for (Collection<? extends Document> c : COLLECTIONS) { if (!collectionNames.contains(c.toString())) { LOG.debug("Collection {} does not exist", c); @@ -89,7 +91,7 @@ public final class MongoDocumentStoreMet private CollectionStats getStats(Collection<? extends Document> c) throws MongoException { CollectionStats stats = new CollectionStats(); - BasicDBObject result = db.getCollection(c.toString()).getStats(); + BasicDBObject result = new BasicDBObject(db.runCommand(new org.bson.Document("collStats", c.toString()))); stats.count = result.getLong("count", 0); stats.size = result.getLong("size", 0); stats.storageSize = result.getLong("storageSize", 0); Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1827987&r1=1827986&r2=1827987&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java Thu Mar 29 13:06:10 2018 @@ -22,22 +22,21 @@ package org.apache.jackrabbit.oak.plugin import javax.annotation.Nonnull; import static com.google.common.collect.Iterables.transform; -import static com.mongodb.QueryBuilder.start; +import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES; +import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; -import com.google.common.base.Function; import com.mongodb.BasicDBObject; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.QueryBuilder; import com.mongodb.ReadPreference; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Filters; import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo; -import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.MissingLastRevSeeker; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable; import org.apache.jackrabbit.oak.stats.Clock; +import org.bson.conversions.Bson; /** * Mongo specific version of MissingLastRevSeeker which uses mongo queries @@ -56,39 +55,32 @@ public class MongoMissingLastRevSeeker e @Override @Nonnull public CloseableIterable<NodeDocument> getCandidates(final long startTime) { - DBObject query = - start(NodeDocument.MODIFIED_IN_SECS).greaterThanEquals( - NodeDocument.getModifiedInSecs(startTime)) - .get(); - DBObject sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, 1); - - DBCursor cursor = - getNodeCollection().find(query) - .sort(sortFields) - .setReadPreference(ReadPreference.primary()); - return CloseableIterable.wrap(transform(cursor, new Function<DBObject, NodeDocument>() { - @Override - public NodeDocument apply(DBObject input) { - return store.convertFromDBObject(Collection.NODES, input); - } - }), cursor); + Bson query = Filters.gte(NodeDocument.MODIFIED_IN_SECS, NodeDocument.getModifiedInSecs(startTime)); + Bson sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, 1); + + FindIterable<BasicDBObject> cursor = getNodeCollection() + .withReadPreference(ReadPreference.primary()) + .find(query).sort(sortFields); + return CloseableIterable.wrap(transform(cursor, + input -> store.convertFromDBObject(NODES, input))); } @Override public boolean isRecoveryNeeded() { - QueryBuilder query = - start(ClusterNodeInfo.STATE).is(ClusterNodeInfo.ClusterNodeState.ACTIVE.name()) - .put(ClusterNodeInfo.LEASE_END_KEY).lessThan(clock.getTime()); + Bson query = Filters.and( + Filters.eq(ClusterNodeInfo.STATE, ClusterNodeInfo.ClusterNodeState.ACTIVE.name()), + Filters.lt(ClusterNodeInfo.LEASE_END_KEY, clock.getTime()) + ); - return getClusterNodeCollection().findOne(query.get()) != null; + return getClusterNodeCollection().find(query).iterator().hasNext(); } - private DBCollection getNodeCollection() { - return store.getDBCollection(Collection.NODES); + private MongoCollection<BasicDBObject> getNodeCollection() { + return store.getDBCollection(NODES); } - private DBCollection getClusterNodeCollection() { - return store.getDBCollection(Collection.CLUSTER_NODES); + private MongoCollection<BasicDBObject> getClusterNodeCollection() { + return store.getDBCollection(CLUSTER_NODES); } } Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java?rev=1827987&r1=1827986&r2=1827987&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java Thu Mar 29 13:06:10 2018 @@ -19,12 +19,12 @@ package org.apache.jackrabbit.oak.plugin import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.mongodb.BasicDBObject; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; +import com.mongodb.MongoClient; import com.mongodb.MongoQueryException; import com.mongodb.ReadConcern; -import com.mongodb.client.model.DBCollectionFindOptions; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +42,11 @@ public class MongoStatus { .add("host", "process", "connections", "repl", "storageEngine", "mem") .build(); - private final DB db; + private final MongoClient client; + + private final String dbName; + + private final ClusterDescriptionProvider descriptionProvider; private BasicDBObject serverStatus; @@ -54,8 +58,17 @@ public class MongoStatus { private Boolean majorityReadConcernEnabled; - public MongoStatus(@Nonnull DB db) { - this.db = db; + public MongoStatus(@Nonnull MongoClient client, + @Nonnull String dbName) { + this(client, dbName, () -> null); + } + + public MongoStatus(@Nonnull MongoClient client, + @Nonnull String dbName, + @Nonnull ClusterDescriptionProvider descriptionProvider) { + this.client = client; + this.dbName = dbName; + this.descriptionProvider = descriptionProvider; } public void checkVersion() { @@ -101,15 +114,15 @@ public class MongoStatus { // Mongo API doesn't seem to provide an option to check whether the // majority read concern has been enabled, so we have to try to use // it and optionally catch the exception. - DBCollection emptyCollection = db.getCollection("emptyCollection-" + System.currentTimeMillis()); - DBCursor cursor = emptyCollection.find(new BasicDBObject(), new DBCollectionFindOptions().readConcern(ReadConcern.MAJORITY)); - try { + MongoCollection<?> emptyCollection = client.getDatabase(dbName) + .getCollection("emptyCollection-" + System.currentTimeMillis()); + try (MongoCursor cursor = emptyCollection + .withReadConcern(ReadConcern.MAJORITY) + .find(new BasicDBObject()).iterator()) { cursor.hasNext(); majorityReadConcernEnabled = true; } catch (MongoQueryException | IllegalArgumentException e) { majorityReadConcernEnabled = false; - } finally { - cursor.close(); } } return majorityReadConcernEnabled; @@ -161,14 +174,16 @@ public class MongoStatus { private BasicDBObject getServerStatus() { if (serverStatus == null) { - serverStatus = db.command("serverStatus"); + serverStatus = client.getDatabase(dbName).runCommand( + new BasicDBObject("serverStatus", 1), BasicDBObject.class); } return serverStatus; } private BasicDBObject getBuildInfo() { if (buildInfo == null) { - buildInfo = db.command("buildInfo"); + buildInfo = client.getDatabase(dbName).runCommand( + new BasicDBObject("buildInfo", 1), BasicDBObject.class); } return buildInfo; } Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoUtils.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoUtils.java?rev=1827987&r1=1827986&r2=1827987&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoUtils.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoUtils.java Thu Mar 29 13:06:10 2018 @@ -20,16 +20,17 @@ import java.util.Set; import com.google.common.collect.Sets; import com.mongodb.BasicDBObject; -import com.mongodb.DBCollection; -import com.mongodb.DBObject; import com.mongodb.MongoCommandException; import com.mongodb.MongoException; import com.mongodb.MongoNotPrimaryException; import com.mongodb.MongoSocketException; import com.mongodb.MongoWriteConcernException; import com.mongodb.WriteConcernException; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.IndexOptions; import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException.Type; +import org.bson.Document; import static com.google.common.base.Preconditions.checkArgument; @@ -49,7 +50,7 @@ class MongoUtils { * @param sparse whether the index should be sparse. * @throws MongoException if the operation fails. */ - static void createIndex(DBCollection collection, + static void createIndex(MongoCollection<?> collection, String field, boolean ascending, boolean unique, @@ -73,20 +74,18 @@ class MongoUtils { * arrays have different lengths. * @throws MongoException if the operation fails. */ - static void createIndex(DBCollection collection, + static void createIndex(MongoCollection<?> collection, String[] fields, boolean[] ascending, boolean unique, boolean sparse) throws MongoException { checkArgument(fields.length == ascending.length); - DBObject index = new BasicDBObject(); + BasicDBObject index = new BasicDBObject(); for (int i = 0; i < fields.length; i++) { index.put(fields[i], ascending[i] ? 1 : -1); } - DBObject options = new BasicDBObject(); - options.put("unique", unique); - options.put("sparse", sparse); + IndexOptions options = new IndexOptions().unique(unique).sparse(sparse); collection.createIndex(index, options); } @@ -101,17 +100,16 @@ class MongoUtils { * @param filter the filter expression for the partial index. * @throws MongoException if the operation fails. */ - static void createPartialIndex(DBCollection collection, + static void createPartialIndex(MongoCollection<?> collection, String[] fields, boolean[] ascending, String filter) throws MongoException { checkArgument(fields.length == ascending.length); - DBObject index = new BasicDBObject(); + BasicDBObject index = new BasicDBObject(); for (int i = 0; i < fields.length; i++) { index.put(fields[i], ascending[i] ? 1 : -1); } - DBObject options = new BasicDBObject(); - options.put("partialFilterExpression", BasicDBObject.parse(filter)); + IndexOptions options = new IndexOptions().partialFilterExpression(BasicDBObject.parse(filter)); collection.createIndex(index, options); } @@ -128,11 +126,11 @@ class MongoUtils { * @return {@code true} if the index exists, {@code false} otherwise. * @throws MongoException if the operation fails. */ - static boolean hasIndex(DBCollection collection, String... fields) + static boolean hasIndex(MongoCollection<?> collection, String... fields) throws MongoException { Set<String> uniqueFields = Sets.newHashSet(fields); - for (DBObject info : collection.getIndexInfo()) { - DBObject key = (DBObject) info.get("key"); + for (Document info : collection.listIndexes()) { + Document key = (Document) info.get("key"); Set<String> indexFields = Sets.newHashSet(key.keySet()); if (uniqueFields.equals(indexFields)) { return true; Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java?rev=1827987&r1=1827986&r2=1827987&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java Thu Mar 29 13:06:10 2018 @@ -19,7 +19,7 @@ package org.apache.jackrabbit.oak.plugins.document.mongo; -import java.util.Iterator; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -31,15 +31,14 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.StandardSystemProperty; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.mongodb.BasicDBObject; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.QueryBuilder; +import com.mongodb.Block; import com.mongodb.ReadPreference; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Filters; + import org.apache.jackrabbit.oak.plugins.document.Document; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.apache.jackrabbit.oak.plugins.document.Revision; @@ -50,15 +49,17 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable; import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.stats.Clock; +import org.bson.conversions.Bson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; -import static com.mongodb.QueryBuilder.start; import static java.util.Collections.singletonList; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; import static org.apache.jackrabbit.oak.plugins.document.Document.ID; +import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.DELETED_ONCE; +import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.PATH; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SD_MAX_REV_TIME_IN_SECS; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SD_TYPE; @@ -76,7 +77,6 @@ import static org.apache.jackrabbit.oak. public class MongoVersionGCSupport extends VersionGCSupport { private static final Logger LOG = LoggerFactory.getLogger(MongoVersionGCSupport.class); - private static final DBObject SD_TYPE_HINT = start(SD_TYPE).is(1).get(); private final MongoDocumentStore store; @@ -94,24 +94,25 @@ public class MongoVersionGCSupport exten @Override public CloseableIterable<NodeDocument> getPossiblyDeletedDocs(final long fromModified, final long toModified) { //_deletedOnce == true && _modified >= fromModified && _modified < toModified - DBObject query = start(NodeDocument.DELETED_ONCE).is(Boolean.TRUE).put(NodeDocument.MODIFIED_IN_SECS) - .greaterThanEquals(NodeDocument.getModifiedInSecs(fromModified)) - .lessThan(NodeDocument.getModifiedInSecs(toModified)).get(); - DBCursor cursor = getNodeCollection().find(query).setReadPreference(ReadPreference.secondaryPreferred()); - cursor.batchSize(batchSize); + Bson query = Filters.and( + Filters.eq(DELETED_ONCE, true), + Filters.gte(MODIFIED_IN_SECS, getModifiedInSecs(fromModified)), + Filters.lt(MODIFIED_IN_SECS, getModifiedInSecs(toModified)) + ); + FindIterable<BasicDBObject> cursor = getNodeCollection() + .withReadPreference(ReadPreference.secondaryPreferred()) + .find(query).batchSize(batchSize); - return CloseableIterable.wrap(transform(cursor, new Function<DBObject, NodeDocument>() { - @Override - public NodeDocument apply(DBObject input) { - return store.convertFromDBObject(NODES, input); - } - }), cursor); + return CloseableIterable.wrap(transform(cursor, + input -> store.convertFromDBObject(NODES, input))); } @Override public long getDeletedOnceCount() { - DBObject query = start(NodeDocument.DELETED_ONCE).is(Boolean.TRUE).get(); - return getNodeCollection().count(query, ReadPreference.secondaryPreferred()); + Bson query = Filters.eq(DELETED_ONCE, Boolean.TRUE); + return getNodeCollection() + .withReadPreference(ReadPreference.secondaryPreferred()) + .count(query); } @Override @@ -128,9 +129,9 @@ public class MongoVersionGCSupport exten final long oldestRevTimeStamp) { return filter(transform(getNodeCollection().find( createQuery(gcTypes, sweepRevs, oldestRevTimeStamp)), - new Function<DBObject, NodeDocument>() { + new Function<BasicDBObject, NodeDocument>() { @Override - public NodeDocument apply(DBObject input) { + public NodeDocument apply(BasicDBObject input) { return store.convertFromDBObject(NODES, input); } }), new Predicate<NodeDocument>() { @@ -144,102 +145,96 @@ public class MongoVersionGCSupport exten @Override public long getOldestDeletedOnceTimestamp(Clock clock, long precisionMs) { LOG.debug("getOldestDeletedOnceTimestamp() <- start"); - DBObject query = start(NodeDocument.DELETED_ONCE).is(Boolean.TRUE).get(); - DBCursor cursor = getNodeCollection().find(query).sort(start(NodeDocument.MODIFIED_IN_SECS).is(1).get()).limit(1); - CloseableIterable<NodeDocument> results = CloseableIterable.wrap(transform(cursor, new Function<DBObject, NodeDocument>() { + Bson query = Filters.eq(DELETED_ONCE, Boolean.TRUE); + Bson sort = Filters.eq(MODIFIED_IN_SECS, 1); + List<Long> result = new ArrayList<>(1); + getNodeCollection().find(query).sort(sort).limit(1).forEach( + new Block<BasicDBObject>() { @Override - public NodeDocument apply(DBObject input) { - return store.convertFromDBObject(NODES, input); - } - }), cursor); - try { - Iterator<NodeDocument> i = results.iterator(); - if (i.hasNext()) { - NodeDocument doc = i.next(); + public void apply(BasicDBObject document) { + NodeDocument doc = store.convertFromDBObject(NODES, document); long modifiedMs = doc.getModified() * TimeUnit.SECONDS.toMillis(1); if (LOG.isDebugEnabled()) { LOG.debug("getOldestDeletedOnceTimestamp() -> {}", Utils.timestampToString(modifiedMs)); } - return modifiedMs; + result.add(modifiedMs); } + }); + if (result.isEmpty()) { + LOG.debug("getOldestDeletedOnceTimestamp() -> none found, return current time"); + result.add(clock.getTime()); } - finally { - Utils.closeIfCloseable(results); - } - LOG.debug("getOldestDeletedOnceTimestamp() -> none found, return current time"); - return clock.getTime(); + return result.get(0); } - private DBObject createQuery(Set<SplitDocType> gcTypes, + private Bson createQuery(Set<SplitDocType> gcTypes, RevisionVector sweepRevs, long oldestRevTimeStamp) { List<Integer> gcTypeCodes = Lists.newArrayList(); - QueryBuilder orClause = start(); + List<Bson> orClauses = Lists.newArrayList(); for(SplitDocType type : gcTypes) { gcTypeCodes.add(type.typeCode()); - for (DBObject query : queriesForType(type, sweepRevs)) { - orClause.or(query); + for (Bson query : queriesForType(type, sweepRevs)) { + orClauses.add(query); } } - return start() - .and( - start(SD_TYPE).in(gcTypeCodes).get(), - orClause.get(), - start(NodeDocument.SD_MAX_REV_TIME_IN_SECS) - .lessThan(NodeDocument.getModifiedInSecs(oldestRevTimeStamp)) - .get() - ).get(); + return Filters.and( + Filters.in(SD_TYPE, gcTypeCodes), + Filters.or(orClauses), + Filters.lt(SD_MAX_REV_TIME_IN_SECS, getModifiedInSecs(oldestRevTimeStamp)) + ); } @Nonnull - private Iterable<DBObject> queriesForType(SplitDocType type, RevisionVector sweepRevs) { + private Iterable<Bson> queriesForType(SplitDocType type, RevisionVector sweepRevs) { if (type != DEFAULT_NO_BRANCH) { - return singletonList(start(SD_TYPE).is(type.typeCode()).get()); + return singletonList(Filters.eq(SD_TYPE, type.typeCode())); } // default_no_branch split type is special because we can // only remove those older than sweep rev - List<DBObject> queries = Lists.newArrayList(); + List<Bson> queries = Lists.newArrayList(); for (Revision r : sweepRevs) { String idSuffix = Utils.getPreviousIdFor("/", r, 0); idSuffix = idSuffix.substring(idSuffix.lastIndexOf('-')); // id/path constraint for previous documents - QueryBuilder idPathClause = start(); - idPathClause.or(start(ID).regex(Pattern.compile(".*" + idSuffix)).get()); - // previous documents with long paths do not have a '-' in the id - idPathClause.or(start(ID).regex(Pattern.compile("[^-]*")) - .and(PATH).regex(Pattern.compile(".*" + idSuffix)).get()); - - queries.add(start(SD_TYPE).is(type.typeCode()) - .and(idPathClause.get()) - .and(SD_MAX_REV_TIME_IN_SECS).lessThan(getModifiedInSecs(r.getTimestamp())) - .get()); + Bson idPathClause = Filters.or( + Filters.regex(ID, Pattern.compile(".*" + idSuffix)), + // previous documents with long paths do not have a '-' in the id + Filters.and( + Filters.regex(ID, Pattern.compile("[^-]*")), + Filters.regex(PATH, Pattern.compile(".*" + idSuffix)) + ) + ); + + queries.add(Filters.and( + Filters.eq(SD_TYPE, type.typeCode()), + idPathClause, + Filters.lt(SD_MAX_REV_TIME_IN_SECS, getModifiedInSecs(r.getTimestamp())) + )); } return queries; } - private void logSplitDocIdsTobeDeleted(DBObject query) { + private void logSplitDocIdsTobeDeleted(Bson query) { // Fetch only the id final BasicDBObject keys = new BasicDBObject(Document.ID, 1); - List<String> ids; - DBCursor cursor = getNodeCollection().find(query, keys) - .setReadPreference(store.getConfiguredReadPreference(NODES)); - try { - ids = ImmutableList.copyOf(Iterables.transform(cursor, new Function<DBObject, String>() { - @Override - public String apply(DBObject input) { - return (String) input.get(Document.ID); - } - })); - } finally { - cursor.close(); - } + List<String> ids = new ArrayList<>(); + getNodeCollection() + .withReadPreference(store.getConfiguredReadPreference(NODES)) + .find(query).projection(keys) + .forEach((Block<BasicDBObject>) doc -> ids.add(getID(doc))); + StringBuilder sb = new StringBuilder("Split documents with following ids were deleted as part of GC \n"); Joiner.on(StandardSystemProperty.LINE_SEPARATOR.value()).appendTo(sb, ids); LOG.debug(sb.toString()); } - private DBCollection getNodeCollection(){ + private static String getID(BasicDBObject document) { + return String.valueOf(document.get(Document.ID)); + } + + private MongoCollection<BasicDBObject> getNodeCollection(){ return store.getDBCollection(NODES); } @@ -267,7 +262,7 @@ public class MongoVersionGCSupport exten @Override protected int deleteSplitDocuments() { - DBObject query = createQuery(gcTypes, sweepRevs, oldestRevTimeStamp); + Bson query = createQuery(gcTypes, sweepRevs, oldestRevTimeStamp); if(LOG.isDebugEnabled()){ //if debug level logging is on then determine the id of documents to be deleted @@ -275,7 +270,7 @@ public class MongoVersionGCSupport exten logSplitDocIdsTobeDeleted(query); } - return getNodeCollection().remove(query).getN(); + return (int) getNodeCollection().deleteMany(query).getDeletedCount(); } } } Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java?rev=1827987&r1=1827986&r2=1827987&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java Thu Mar 29 13:06:10 2018 @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory; import com.mongodb.BasicDBObject; import com.mongodb.DB; +import com.mongodb.MongoClient; import com.mongodb.MongoException; import com.mongodb.ReadPreference; @@ -85,13 +86,13 @@ public class ReplicaSetInfo implements R private volatile boolean stop; - public ReplicaSetInfo(Clock clock, DB db, String originalMongoUri, long pullFrequencyMillis, long maxReplicationLagMillis, Executor executor) { + public ReplicaSetInfo(Clock clock, MongoClient client, String dbName, String originalMongoUri, long pullFrequencyMillis, long maxReplicationLagMillis, Executor executor) { this.executor = executor; this.clock = clock; - this.adminDb = db.getSisterDB("admin"); + this.adminDb = client.getDB("admin"); this.pullFrequencyMillis = pullFrequencyMillis; this.maxReplicationLagMillis = maxReplicationLagMillis; - this.nodeCollections = new NodeCollectionProvider(originalMongoUri, db.getName()); + this.nodeCollections = new NodeCollectionProvider(originalMongoUri, dbName); } public void addListener(ReplicaSetInfoListener listener) { Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/CloseableIterable.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/CloseableIterable.java?rev=1827987&r1=1827986&r2=1827987&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/CloseableIterable.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/CloseableIterable.java Thu Mar 29 13:06:10 2018 @@ -23,32 +23,41 @@ import java.io.Closeable; import java.io.IOException; import java.util.Iterator; +import javax.annotation.Nonnull; + +import com.google.common.io.Closer; + public class CloseableIterable<T> implements Iterable<T>, Closeable { private final Iterable<T> iterable; - private final Closeable closeable; + private final Closer closer = Closer.create(); - public static <T> CloseableIterable<T> wrap(Iterable<T> iterable, Closeable closeable){ + public static <T> CloseableIterable<T> wrap(Iterable<T> iterable, Closeable closeable) { return new CloseableIterable<T>(iterable, closeable); } - public static <T> CloseableIterable<T> wrap(Iterable<T> iterable){ + public static <T> CloseableIterable<T> wrap(Iterable<T> iterable) { return new CloseableIterable<T>(iterable, null); } public CloseableIterable(Iterable<T> iterable, Closeable closeable) { this.iterable = iterable; - this.closeable = closeable; + if (closeable != null) { + this.closer.register(closeable); + } } @Override public void close() throws IOException { - if(closeable != null){ - closeable.close(); - } + closer.close(); } + @Nonnull @Override public Iterator<T> iterator() { - return iterable.iterator(); + Iterator<T> it = iterable.iterator(); + if (it instanceof Closeable) { + closer.register((Closeable) it); + } + return it; } } Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java?rev=1827987&r1=1827986&r2=1827987&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java Thu Mar 29 13:06:10 2018 @@ -24,6 +24,7 @@ import javax.annotation.Nonnull; import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; import com.mongodb.DB; +import com.mongodb.Mongo; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientURI; @@ -31,6 +32,7 @@ import com.mongodb.MongoException; import com.mongodb.ReadConcern; import com.mongodb.ReadConcernLevel; import com.mongodb.WriteConcern; +import com.mongodb.client.MongoDatabase; import static com.google.common.base.Preconditions.checkNotNull; @@ -84,6 +86,13 @@ public class MongoConnection { } /** + * @return the {@link MongoClient} for this connection. + */ + public MongoClient getMongoClient() { + return mongo; + } + + /** * Returns the {@link DB} as passed in the URI of the constructor. * * @return The {@link DB}. @@ -93,6 +102,16 @@ public class MongoConnection { } /** + * Returns the {@link MongoDatabase} as passed in the URI of the + * constructor. + * + * @return the {@link MongoDatabase}. + */ + public MongoDatabase getDatabase() { + return mongo.getDatabase(mongoURI.getDatabase()); + } + + /** * Returns the {@link DB} with the given name. * * @return The {@link DB}. @@ -102,6 +121,22 @@ public class MongoConnection { } /** + * Returns the {@link MongoDatabase} with the given name. + * + * @return The {@link MongoDatabase}. + */ + public MongoDatabase getDatabase(@Nonnull String name) { + return mongo.getDatabase(name); + } + + /** + * @return the database name specified in the URI. + */ + public String getDBName() { + return mongoURI.getDatabase(); + } + + /** * Closes the underlying Mongo instance */ public void close() { @@ -171,10 +206,25 @@ public class MongoConnection { * * @param db the connection to MongoDB. * @return the default write concern to use for Oak. + * @deprecated use {@link #getDefaultWriteConcern(Mongo)} instead. */ public static WriteConcern getDefaultWriteConcern(@Nonnull DB db) { + return getDefaultWriteConcern(db.getMongo()); + } + + /** + * Returns the default write concern depending on MongoDB deployment. + * <ul> + * <li>{@link WriteConcern#MAJORITY}: for a MongoDB replica set</li> + * <li>{@link WriteConcern#ACKNOWLEDGED}: for single MongoDB instance</li> + * </ul> + * + * @param client the connection to MongoDB. + * @return the default write concern to use for Oak. + */ + public static WriteConcern getDefaultWriteConcern(@Nonnull Mongo client) { WriteConcern w; - if (checkNotNull(db).getMongo().getReplicaSetStatus() != null) { + if (client.getReplicaSetStatus() != null) { w = WriteConcern.MAJORITY; } else { w = WriteConcern.ACKNOWLEDGED; @@ -191,6 +241,7 @@ public class MongoConnection { * * @param db the connection to MongoDB. * @return the default write concern to use for Oak. + * @deprecated use {@link #getDefaultReadConcern(Mongo, MongoDatabase)} instead. */ public static ReadConcern getDefaultReadConcern(@Nonnull DB db) { ReadConcern r; @@ -203,13 +254,44 @@ public class MongoConnection { } /** + * Returns the default read concern depending on MongoDB deployment. + * <ul> + * <li>{@link ReadConcern#MAJORITY}: for a MongoDB replica set with w=majority</li> + * <li>{@link ReadConcern#LOCAL}: for other cases</li> + * </ul> + * + * @param db the connection to MongoDB. + * @return the default write concern to use for Oak. + */ + public static ReadConcern getDefaultReadConcern(@Nonnull Mongo client, + @Nonnull MongoDatabase db) { + ReadConcern r; + if (checkNotNull(client).getReplicaSetStatus() != null && isMajorityWriteConcern(db)) { + r = ReadConcern.MAJORITY; + } else { + r = ReadConcern.LOCAL; + } + return r; + } + + /** * Returns true if the majority write concern is used for the given DB. * * @param db the connection to MongoDB. * @return true if the majority write concern has been configured; false otherwise */ public static boolean isMajorityWriteConcern(@Nonnull DB db) { - return "majority".equals(db.getWriteConcern().getWObject()); + return WriteConcern.MAJORITY.getWString().equals(db.getWriteConcern().getWObject()); + } + + /** + * Returns true if the majority write concern is used for the given DB. + * + * @param db the connection to MongoDB. + * @return true if the majority write concern has been configured; false otherwise + */ + public static boolean isMajorityWriteConcern(@Nonnull MongoDatabase db) { + return WriteConcern.MAJORITY.getWString().equals(db.getWriteConcern().getWObject()); } /** @@ -219,9 +301,25 @@ public class MongoConnection { * * @param db the database. * @return whether the write concern is sufficient. + * @deprecated use {@link #isSufficientWriteConcern(Mongo, WriteConcern)} + * instead. */ public static boolean hasSufficientWriteConcern(@Nonnull DB db) { - Object wObj = checkNotNull(db).getWriteConcern().getWObject(); + return isSufficientWriteConcern(checkNotNull(db).getMongo(), db.getWriteConcern()); + } + + /** + * Returns {@code true} if the given write concern is sufficient for Oak. On + * a replica set Oak expects at least w=2. For a single MongoDB node + * deployment w=1 is sufficient. + * + * @param client the client. + * @param wc the write concern. + * @return whether the write concern is sufficient. + */ + public static boolean isSufficientWriteConcern(@Nonnull Mongo client, + @Nonnull WriteConcern wc) { + Object wObj = checkNotNull(wc).getWObject(); int w; if (wObj instanceof Number) { w = ((Number) wObj).intValue(); @@ -233,9 +331,9 @@ public class MongoConnection { w = 2; } else { throw new IllegalArgumentException( - "Unknown write concern: " + db.getWriteConcern()); + "Unknown write concern: " + wc); } - if (db.getMongo().getReplicaSetStatus() != null) { + if (client.getReplicaSetStatus() != null) { return w >= 2; } else { return w >= 1; @@ -249,10 +347,26 @@ public class MongoConnection { * * @param db the database. * @return whether the read concern is sufficient. + * @deprecated use {@link #isSufficientReadConcern(Mongo, ReadConcern)} + * instead. */ public static boolean hasSufficientReadConcern(@Nonnull DB db) { - ReadConcernLevel r = readConcernLevel(checkNotNull(db).getReadConcern()); - if (db.getMongo().getReplicaSetStatus() == null) { + return isSufficientReadConcern(checkNotNull(db).getMongo(), db.getReadConcern()); + } + + /** + * Returns {@code true} if the given read concern is sufficient for Oak. On + * a replica set Oak expects majority or linear. For a single MongoDB node + * deployment local is sufficient. + * + * @param client the client. + * @param rc the read concern. + * @return whether the read concern is sufficient. + */ + public static boolean isSufficientReadConcern(@Nonnull Mongo client, + @Nonnull ReadConcern rc) { + ReadConcernLevel r = readConcernLevel(checkNotNull(rc)); + if (client.getReplicaSetStatus() == null) { return true; } else { return REPLICA_RC.contains(r);
