This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch DetailedGC/OAK-10199 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit e1319b2ffaf813d7b9afef6169d5d6d3ed71edcf Author: Rishabh Kumar <[email protected]> AuthorDate: Wed Jul 12 21:05:02 2023 +0530 OAK-10199 : fixed query to avoid skipping documents with greater _modified timestamp --- .../plugins/document/VersionGarbageCollector.java | 8 ++-- .../document/mongo/MongoVersionGCSupport.java | 1 - .../plugins/document/rdb/RDBVersionGCSupport.java | 52 ++++++++++++++++++---- .../oak/plugins/document/VersionGCInitTest.java | 5 --- 4 files changed, 47 insertions(+), 19 deletions(-) diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java index c7af41b2c4..a2603e16d5 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/VersionGarbageCollector.java @@ -786,8 +786,8 @@ public class VersionGarbageCollector { fromModifiedMs = fromModifiedMs + SECONDS.toMillis(5); foundDoc = true; // to run while loop again } - - // if we are already at last document of current timeStamp, + // if we didn't find any document i.e. either we are already at last document + // of current timeStamp or there is no document for this timeStamp // we need to reset fromId & increment fromModified and check again if (!foundDoc && !Objects.equals(fromId, MIN_ID_VALUE)) { fromId = MIN_ID_VALUE; @@ -941,7 +941,7 @@ public class VersionGarbageCollector { this.ownHeadRevision = headRevision.getRevision(nodeStore.getClusterId()); } - public void collectGarbage(final NodeDocument doc, final GCPhases phases, final Revision revision) { + public void collectGarbage(final NodeDocument doc, final GCPhases phases) { detailedGCStats.documentRead(); monitor.info("Collecting Detailed Garbage for doc [{}]", doc.getId()); @@ -988,8 +988,6 @@ public class VersionGarbageCollector { .filter(p -> !retainPropSet.contains(p)) .mapToInt(x -> { updateOp.remove(x); - setModified(updateOp,revision); - setDeleted(updateOp, revision, false); return 1;}) .sum(); diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java index b9dc2da855..1f6d4bf5f5 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java @@ -49,7 +49,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.regex.Pattern; import com.mongodb.client.MongoCursor; diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java index 13b7ce26de..a37c26a385 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBVersionGCSupport.java @@ -16,15 +16,23 @@ */ package org.apache.jackrabbit.oak.plugins.document.rdb; +import static java.util.Comparator.comparing; import static java.util.List.of; +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Stream.concat; +import static java.util.stream.StreamSupport.stream; import static org.apache.jackrabbit.guava.common.collect.Iterables.filter; +import static org.apache.jackrabbit.guava.common.collect.Iterables.size; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; import static org.apache.jackrabbit.oak.plugins.document.Document.ID; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MIN_ID_VALUE; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS; -import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.NULL; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs; import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.EMPTY_KEY_PATTERN; +import static org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable.wrap; +import static org.apache.jackrabbit.oak.plugins.document.util.Utils.closeIfCloseable; import java.io.Closeable; import java.io.IOException; @@ -33,11 +41,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.apache.jackrabbit.oak.commons.properties.SystemPropertySupplier; import org.apache.jackrabbit.oak.plugins.document.Collection; +import org.apache.jackrabbit.oak.plugins.document.Document; import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.apache.jackrabbit.oak.plugins.document.NodeDocument.SplitDocType; @@ -115,10 +126,36 @@ public class RDBVersionGCSupport extends VersionGCSupport { List<QueryCondition> conditions = of(new QueryCondition(MODIFIED_IN_SECS, "<", getModifiedInSecs(toModified)), new QueryCondition(MODIFIED_IN_SECS, ">=", getModifiedInSecs(fromModified)), new QueryCondition(ID, ">", of(fromId))); + + final List<QueryCondition> c2 = of(new QueryCondition(MODIFIED_IN_SECS, "<", getModifiedInSecs(toModified)), + new QueryCondition(MODIFIED_IN_SECS, ">", getModifiedInSecs(fromModified))); + if (MODE == 1) { - return getIterator(EMPTY_KEY_PATTERN, conditions); + final Iterable<NodeDocument> itr1 = getIterator(EMPTY_KEY_PATTERN, c1); + if (size(itr1) >= limit) { + return itr1; + } + final Iterable<NodeDocument> itr2 = getIterator(EMPTY_KEY_PATTERN, c2); + + final Stream<NodeDocument> s1 = stream(itr1.spliterator(), false); + final Stream<NodeDocument> s2 = stream(itr2.spliterator(), false); + return wrap(concat(s1, s2).sorted((o1, o2) -> comparing(NodeDocument::getModified).thenComparing(Document::getId).compare(o1, o2)).limit(limit).collect(toList()), () -> { + closeIfCloseable(itr1); + closeIfCloseable(itr2); + }); } else { - return store.queryAsIterable(NODES, null, null, EMPTY_KEY_PATTERN, conditions, limit, of(MODIFIED_IN_SECS, ID)); + final Iterable<NodeDocument> itr1 = store.queryAsIterable(NODES, null, null, EMPTY_KEY_PATTERN, c1, limit, of(MODIFIED_IN_SECS, ID)); + if (size(itr1) >= limit) { + return itr1; + } + final Iterable<NodeDocument> itr2 = store.queryAsIterable(NODES, null, null, EMPTY_KEY_PATTERN, c2, limit, of(MODIFIED_IN_SECS, ID)); + + final Stream<NodeDocument> s1 = stream(itr1.spliterator(), false); + final Stream<NodeDocument> s2 = stream(itr2.spliterator(), false); + return wrap(concat(s1, s2).sorted((o1, o2) -> comparing(NodeDocument::getModified).thenComparing(Document::getId).compare(o1, o2)).limit(limit).collect(toList()), () -> { + closeIfCloseable(itr1); + closeIfCloseable(itr2); + }); } } @@ -283,20 +320,19 @@ public class RDBVersionGCSupport extends VersionGCSupport { * @return the timestamp of the oldest modified document. */ @Override - public NodeDocument getOldestModifiedDoc(Clock clock) { - NodeDocument doc = NULL; + public Optional<NodeDocument> getOldestModifiedDoc(Clock clock) { LOG.info("getOldestModifiedDoc() <- start"); Iterable<NodeDocument> modifiedDocs = null; try { modifiedDocs = getModifiedDocs(0L, clock.getTime(), 1, MIN_ID_VALUE); - doc = modifiedDocs.iterator().hasNext() ? modifiedDocs.iterator().next() : NULL; + return modifiedDocs.iterator().hasNext() ? ofNullable(modifiedDocs.iterator().next()) : empty(); } catch (DocumentStoreException ex) { LOG.error("getOldestModifiedDoc()", ex); } finally { - Utils.closeIfCloseable(modifiedDocs); + closeIfCloseable(modifiedDocs); } - return doc; + return empty(); } @Override diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCInitTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCInitTest.java index e333ee765e..561e7426a9 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCInitTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/VersionGCInitTest.java @@ -83,14 +83,9 @@ public class VersionGCInitTest { vgc = store.find(SETTINGS, SETTINGS_COLLECTION_ID); assertNotNull(vgc); -<<<<<<< HEAD assertEquals(stats.oldestModifiedDocTimeStamp, vgc.get(SETTINGS_COLLECTION_DETAILED_GC_TIMESTAMP_PROP)); assertEquals(stats.oldestModifiedDocId, vgc.get(SETTINGS_COLLECTION_DETAILED_GC_DOCUMENT_ID_PROP)); assertEquals(MIN_ID_VALUE, vgc.get(SETTINGS_COLLECTION_DETAILED_GC_DOCUMENT_ID_PROP)); -======= - assertEquals(40_000L, vgc.get(SETTINGS_COLLECTION_DETAILED_GC_TIMESTAMP_PROP)); - assertEquals("1:/node", vgc.get(SETTINGS_COLLECTION_DETAILED_GC_DOCUMENT_ID_PROP)); ->>>>>>> d3b73cd921 (OAK-10199 : added unit cases to handle concurrent prop update and escaped properties update) } @Test
