This is an automated email from the ASF dual-hosted git repository. jsedding pushed a commit to branch jsedding/OAK-11964-refactor-parallel-compaction in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit ce0a520c0fa33eb086ae966fbbf18d6ca40e24b2 Author: Julian Sedding <[email protected]> AuthorDate: Mon Sep 29 16:15:01 2025 +0200 OAK-11964 - refactor parallel compaction inheritance --- .../oak/segment/CheckpointCompactor.java | 15 ++--- .../jackrabbit/oak/segment/ClassicCompactor.java | 48 ++++++++++++---- .../jackrabbit/oak/segment/ParallelCompactor.java | 65 +++++++--------------- .../segment/file/AbstractCompactionStrategy.java | 9 +-- .../CheckpointCompactorExternalBlobTest.java | 2 +- .../oak/segment/CheckpointCompactorTest.java | 2 +- .../segment/ParallelCompactorExternalBlobTest.java | 5 +- .../oak/segment/ParallelCompactorTest.java | 5 +- 8 files changed, 74 insertions(+), 77 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java index 4d0e2cf87b..2385075503 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java @@ -38,8 +38,6 @@ import org.apache.jackrabbit.oak.commons.Buffer; import org.apache.jackrabbit.oak.commons.conditions.Validate; import org.apache.jackrabbit.oak.plugins.memory.MemoryChildNodeEntry; import org.apache.jackrabbit.oak.segment.file.CompactedNodeState; -import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor; -import org.apache.jackrabbit.oak.segment.file.CompactionWriter; import org.apache.jackrabbit.oak.segment.file.cancel.Canceller; import org.apache.jackrabbit.oak.spi.gc.GCMonitor; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; @@ -71,16 +69,13 @@ public class CheckpointCompactor extends Compactor { * Create a new instance based on the passed arguments. * * @param gcListener listener receiving notifications about the garbage collection process - * @param writer segment writer used to serialise to segments - * @param compactionMonitor notification call back for each compacted nodes, - * properties, and binaries + * @param compactor the delegate compactor to use for the actual compaction work */ public CheckpointCompactor( @NotNull GCMonitor gcListener, - @NotNull CompactionWriter writer, - @NotNull GCNodeWriteMonitor compactionMonitor) { + @NotNull ClassicCompactor compactor) { this.gcListener = gcListener; - this.compactor = new ClassicCompactor(writer, compactionMonitor); + this.compactor = compactor; } @Override @@ -240,7 +235,7 @@ public class CheckpointCompactor extends Compactor { /** * Delegate compaction to another, usually simpler, implementation. */ - protected @Nullable CompactedNodeState compactDownWithDelegate( + private @Nullable CompactedNodeState compactDownWithDelegate( @NotNull NodeState before, @NotNull NodeState after, @NotNull Canceller hardCanceller, @@ -249,7 +244,7 @@ public class CheckpointCompactor extends Compactor { return compactor.compactDown(before, after, hardCanceller, softCanceller); } - protected @Nullable CompactedNodeState compactWithDelegate( + private @Nullable CompactedNodeState compactWithDelegate( @NotNull NodeState before, @NotNull NodeState after, @NotNull NodeState onto, diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java index 901e24aec0..326c6e209e 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java @@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.segment; import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElseGet; import static org.apache.jackrabbit.oak.api.Type.BINARIES; import static org.apache.jackrabbit.oak.api.Type.BINARY; import static org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState.binaryProperty; @@ -97,16 +98,41 @@ public class ClassicCompactor extends Compactor { @NotNull NodeState onto, @NotNull Canceller canceller ) throws IOException { - return compact(before, after, onto, canceller, Canceller.newCanceller()); + return compact(before, after, onto, canceller, null); } - private @Nullable CompactedNodeState compact( - @NotNull NodeState before, - @NotNull NodeState after, - @NotNull NodeState onto, - @NotNull Canceller hardCanceller, - @NotNull Canceller softCanceller - ) throws IOException { + /** + * Compact the differences between {@code after} and {@code before} on top of {@code onto}. The + * {@code softCanceller} must be null, unless {@code after.equals(onto)}, i.e. if the method is + * called for a {@link #compactDown(NodeState, NodeState, Canceller, Canceller)} scenario. + * . + * + * @param before The node state used as the baseline for the diff. + * @param after The node state used as the target for the diff. + * @param onto The node state to apply the diff to. + * @param hardCanceller The trigger for hard cancellation, will abandon compaction if cancelled. + * @param softCanceller The trigger for soft cancellation, will return partially compacted state if cancelled. + * Must be null unless {@code after.equals(onto)}. + * @return The compacted node state or {@code null} if hard-cancelled. + * @throws IOException Will throw exception if any errors occur during compaction. + */ + protected @Nullable CompactedNodeState compact( + @NotNull NodeState before, + @NotNull NodeState after, + @NotNull NodeState onto, + @NotNull Canceller hardCanceller, + @Nullable Canceller softCanceller) throws IOException { + return internalCompact(before, after, onto, hardCanceller, softCanceller); + } + + // the private method is necessary to prevent the call in CompactDiff#childNodeUpdated, + // to call an overridden #compact method instead of the one in this class. + private @Nullable CompactedNodeState internalCompact( + @NotNull NodeState before, + @NotNull NodeState after, + @NotNull NodeState onto, + @NotNull Canceller hardCanceller, + @Nullable Canceller softCanceller) throws IOException { CompactedNodeState compactedState = getPreviouslyCompactedState(after); if (compactedState == null) { compactedState = new CompactDiff(onto, hardCanceller, softCanceller).diff(before, after); @@ -149,11 +175,11 @@ public class ClassicCompactor extends Compactor { } } - CompactDiff(@NotNull NodeState base, @NotNull Canceller hardCanceller, @NotNull Canceller softCanceller) { + CompactDiff(@NotNull NodeState base, @NotNull Canceller hardCanceller, @Nullable Canceller softCanceller) { this.base = requireNonNull(base); this.builder = new MemoryNodeBuilder(base); this.hardCanceller = requireNonNull(hardCanceller); - this.softCanceller = requireNonNull(softCanceller); + this.softCanceller = requireNonNullElseGet(softCanceller, Canceller::newCanceller); } private @NotNull CancelableDiff newCancelableDiff() { @@ -200,7 +226,7 @@ public class ClassicCompactor extends Compactor { try { NodeState child = base.getChildNode(name); NodeState onto = child.exists() ? child : EMPTY_NODE; - CompactedNodeState compacted = compact(before, after, onto, hardCanceller, softCanceller); + CompactedNodeState compacted = internalCompact(before, after, onto, hardCanceller, softCanceller); if (compacted == null) { return false; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java index 8fef492a99..0e4e4a0d55 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java @@ -23,8 +23,8 @@ import org.apache.jackrabbit.oak.commons.Buffer; import org.apache.jackrabbit.oak.commons.conditions.Validate; import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder; import org.apache.jackrabbit.oak.segment.file.CompactedNodeState; -import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor; import org.apache.jackrabbit.oak.segment.file.CompactionWriter; +import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor; import org.apache.jackrabbit.oak.segment.file.cancel.Canceller; import org.apache.jackrabbit.oak.spi.gc.GCMonitor; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -55,7 +55,7 @@ import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE * Every node at this depth will be an entry point for asynchronous compaction. After the exploration phase, * the main thread will collect these compaction results and write their parents' node state to disk. */ -public class ParallelCompactor extends CheckpointCompactor { +public class ParallelCompactor extends ClassicCompactor { /** * Expand repository tree until there are this many nodes for each worker to compact. Tradeoff * between inefficiency of many small tasks and high risk of at least one of the subtrees being @@ -68,6 +68,8 @@ public class ParallelCompactor extends CheckpointCompactor { */ private static final int EXPLORATION_UPPER_LIMIT = 100_000; + private final GCMonitor gcListener; + private final int numWorkers; private final long totalSizeEstimate; @@ -91,12 +93,13 @@ public class ParallelCompactor extends CheckpointCompactor { @NotNull CompactionWriter writer, @NotNull GCNodeWriteMonitor compactionMonitor, int nThreads) { - super(gcListener, writer, compactionMonitor); + super(writer, compactionMonitor); if (nThreads < 0) { nThreads += Runtime.getRuntime().availableProcessors() + 1; } - numWorkers = Math.max(0, nThreads - 1); - totalSizeEstimate = compactionMonitor.getEstimatedTotal(); + this.gcListener = gcListener; + this.numWorkers = Math.max(0, nThreads - 1); + this.totalSizeEstimate = compactionMonitor.getEstimatedTotal(); } /** @@ -131,7 +134,7 @@ public class ParallelCompactor extends CheckpointCompactor { } @NotNull PropertyState compact() { - return compactor.compact(state); + return ParallelCompactor.this.compact(state); } } @@ -142,7 +145,7 @@ public class ParallelCompactor extends CheckpointCompactor { @Nullable List<Entry<String, CompactionTree>> expand(@NotNull Canceller hardCanceller) { Validate.checkState(compactionFuture == null); - CompactedNodeState compactedState = compactor.getPreviouslyCompactedState(after); + CompactedNodeState compactedState = getPreviouslyCompactedState(after); if (compactedState != null) { compactionFuture = CompletableFuture.completedFuture(compactedState); return Collections.emptyList(); @@ -203,14 +206,9 @@ public class ParallelCompactor extends CheckpointCompactor { void compactAsync(@NotNull Canceller hardCanceller, @Nullable Canceller softCanceller) { if (compactionFuture == null) { requireNonNull(executorService); - if (softCanceller == null) { - compactionFuture = executorService.submit(() -> - compactor.compact(before, after, onto, hardCanceller)); - } else { - Validate.checkState(onto.equals(after)); - compactionFuture = executorService.submit(() -> - compactor.compactDown(before, after, hardCanceller, softCanceller)); - } + Validate.checkState(softCanceller == null || onto.equals(after)); + compactionFuture = executorService.submit(() -> + ParallelCompactor.super.compact(before, after, onto, hardCanceller, softCanceller)); } } @@ -267,7 +265,7 @@ public class ParallelCompactor extends CheckpointCompactor { builder.setChildNode(entry.getKey(), compactedState); } } - return compactor.writeNodeState(builder.getNodeState(), stableIdBytes, false); + return writeNodeState(builder.getNodeState(), stableIdBytes, false); } } @@ -283,7 +281,7 @@ public class ParallelCompactor extends CheckpointCompactor { builder.removeProperty(name); } - return compactor.writeNodeState(builder.getNodeState(), stableIdBytes, true); + return writeNodeState(builder.getNodeState(), stableIdBytes, true); } } @@ -296,13 +294,7 @@ public class ParallelCompactor extends CheckpointCompactor { private final @NotNull Canceller hardCanceller; private final @Nullable Canceller softCanceller; - CompactionHandler(@NotNull NodeState base, @NotNull Canceller hardCanceller) { - this.base = base; - this.hardCanceller = hardCanceller; - this.softCanceller = null; - } - - CompactionHandler(@NotNull NodeState base, @NotNull Canceller hardCanceller, @NotNull Canceller softCanceller) { + CompactionHandler(@NotNull NodeState base, @NotNull Canceller hardCanceller, @Nullable Canceller softCanceller) { this.base = base; this.hardCanceller = hardCanceller; this.softCanceller = softCanceller; @@ -385,30 +377,11 @@ public class ParallelCompactor extends CheckpointCompactor { } @Override - protected @Nullable CompactedNodeState compactDownWithDelegate( - @NotNull NodeState before, - @NotNull NodeState after, - @NotNull Canceller hardCanceller, - @NotNull Canceller softCanceller - ) throws IOException { - if (initializeExecutor()) { - return new CompactionHandler(after, hardCanceller, softCanceller).diff(before, after); - } else { - return super.compactDownWithDelegate(before, after, hardCanceller, softCanceller); - } - } - - @Override - protected @Nullable CompactedNodeState compactWithDelegate( - @NotNull NodeState before, - @NotNull NodeState after, - @NotNull NodeState onto, - @NotNull Canceller canceller - ) throws IOException { + protected @Nullable CompactedNodeState compact(@NotNull NodeState before, @NotNull NodeState after, @NotNull NodeState onto, @NotNull Canceller hardCanceller, @Nullable Canceller softCanceller) throws IOException { if (initializeExecutor()) { - return new CompactionHandler(onto, canceller).diff(before, after); + return new CompactionHandler(onto, hardCanceller, softCanceller).diff(before, after); } else { - return super.compactWithDelegate(before, after, onto, canceller); + return super.compact(before, after, onto, hardCanceller, softCanceller); } } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java index f368f8ec2a..19df646df4 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java @@ -33,7 +33,6 @@ import org.apache.jackrabbit.oak.segment.Compactor; import org.apache.jackrabbit.oak.segment.ParallelCompactor; import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.SegmentNodeState; -import org.apache.jackrabbit.oak.segment.SegmentWriter; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType; import org.apache.jackrabbit.oak.segment.file.cancel.Cancellation; @@ -295,10 +294,12 @@ abstract class AbstractCompactionStrategy implements CompactionStrategy { CompactorType compactorType = context.getGCOptions().getCompactorType(); switch (compactorType) { case PARALLEL_COMPACTOR: - return new ParallelCompactor(context.getGCListener(), writer, context.getCompactionMonitor(), - context.getGCOptions().getConcurrency()); + return new CheckpointCompactor(context.getGCListener(), + new ParallelCompactor(context.getGCListener(), writer, context.getCompactionMonitor(), + context.getGCOptions().getConcurrency())); case CHECKPOINT_COMPACTOR: - return new CheckpointCompactor(context.getGCListener(), writer, context.getCompactionMonitor()); + return new CheckpointCompactor(context.getGCListener(), + new ClassicCompactor(writer, context.getCompactionMonitor())); case CLASSIC_COMPACTOR: return new ClassicCompactor(writer, context.getCompactionMonitor()); default: diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java index 0bdfb39a5d..955eaa2952 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java @@ -40,6 +40,6 @@ public class CheckpointCompactorExternalBlobTest extends AbstractCompactorExtern protected CheckpointCompactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation) { SegmentWriter writer = defaultSegmentWriterBuilder("c").withGeneration(generation).build(fileStore); CompactionWriter compactionWriter = new CompactionWriter(fileStore.getReader(), fileStore.getBlobStore(), generation, writer); - return new CheckpointCompactor(GCMonitor.EMPTY, compactionWriter, GCNodeWriteMonitor.EMPTY); + return new CheckpointCompactor(GCMonitor.EMPTY, new ClassicCompactor(compactionWriter, GCNodeWriteMonitor.EMPTY)); } } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java index 4c4150a0ce..8f9ad60c2b 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java @@ -46,6 +46,6 @@ public class CheckpointCompactorTest extends AbstractCompactorTest { .withGeneration(generation) .build(fileStore); CompactionWriter compactionWriter = new CompactionWriter(fileStore.getReader(), fileStore.getBlobStore(), increment, writerFactory); - return new CheckpointCompactor(GCMonitor.EMPTY, compactionWriter, compactionMonitor); + return new CheckpointCompactor(GCMonitor.EMPTY, new ClassicCompactor(compactionWriter, GCNodeWriteMonitor.EMPTY)); } } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java index 210dd60c61..06a26bb3f9 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java @@ -57,12 +57,13 @@ public class ParallelCompactorExternalBlobTest extends AbstractCompactorExternal } @Override - protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation) { + protected Compactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation) { SegmentWriter writer = defaultSegmentWriterBuilder("c") .withGeneration(generation) .withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC) .build(fileStore); CompactionWriter compactionWriter = new CompactionWriter(fileStore.getReader(), fileStore.getBlobStore(), generation, writer); - return new ParallelCompactor(GCMonitor.EMPTY, compactionWriter, GCNodeWriteMonitor.EMPTY, concurrency); + return new CheckpointCompactor(GCMonitor.EMPTY, + new ParallelCompactor(GCMonitor.EMPTY, compactionWriter, GCNodeWriteMonitor.EMPTY, concurrency)); } } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java index 2855516ab7..f4e4d81ae8 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java @@ -57,7 +57,7 @@ public class ParallelCompactorTest extends AbstractCompactorTest { } @Override - protected ParallelCompactor createCompactor( + protected Compactor createCompactor( @NotNull FileStore fileStore, @NotNull GCIncrement increment, @NotNull GCNodeWriteMonitor compactionMonitor @@ -67,6 +67,7 @@ public class ParallelCompactorTest extends AbstractCompactorTest { .withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC) .build(fileStore); CompactionWriter compactionWriter = new CompactionWriter(fileStore.getReader(), fileStore.getBlobStore(), increment, writerFactory); - return new ParallelCompactor(GCMonitor.EMPTY, compactionWriter, compactionMonitor, concurrency); + return new CheckpointCompactor(GCMonitor.EMPTY, + new ParallelCompactor(GCMonitor.EMPTY, compactionWriter, compactionMonitor, concurrency)); } }
