This is an automated email from the ASF dual-hosted git repository.

jsedding pushed a commit to branch 
jsedding/OAK-11964-refactor-parallel-compaction
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit ce0a520c0fa33eb086ae966fbbf18d6ca40e24b2
Author: Julian Sedding <[email protected]>
AuthorDate: Mon Sep 29 16:15:01 2025 +0200

    OAK-11964 - refactor parallel compaction inheritance
---
 .../oak/segment/CheckpointCompactor.java           | 15 ++---
 .../jackrabbit/oak/segment/ClassicCompactor.java   | 48 ++++++++++++----
 .../jackrabbit/oak/segment/ParallelCompactor.java  | 65 +++++++---------------
 .../segment/file/AbstractCompactionStrategy.java   |  9 +--
 .../CheckpointCompactorExternalBlobTest.java       |  2 +-
 .../oak/segment/CheckpointCompactorTest.java       |  2 +-
 .../segment/ParallelCompactorExternalBlobTest.java |  5 +-
 .../oak/segment/ParallelCompactorTest.java         |  5 +-
 8 files changed, 74 insertions(+), 77 deletions(-)

diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
index 4d0e2cf87b..2385075503 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
@@ -38,8 +38,6 @@ import org.apache.jackrabbit.oak.commons.Buffer;
 import org.apache.jackrabbit.oak.commons.conditions.Validate;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryChildNodeEntry;
 import org.apache.jackrabbit.oak.segment.file.CompactedNodeState;
-import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
-import org.apache.jackrabbit.oak.segment.file.CompactionWriter;
 import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
@@ -71,16 +69,13 @@ public class CheckpointCompactor extends Compactor {
      * Create a new instance based on the passed arguments.
      *
      * @param gcListener        listener receiving notifications about the 
garbage collection process
-     * @param writer           segment writer used to serialise to segments
-     * @param compactionMonitor notification call back for each compacted 
nodes,
-     *                          properties, and binaries
+     * @param compactor         the delegate compactor to use for the actual 
compaction work
      */
     public CheckpointCompactor(
             @NotNull GCMonitor gcListener,
-            @NotNull CompactionWriter writer,
-            @NotNull GCNodeWriteMonitor compactionMonitor) {
+            @NotNull ClassicCompactor compactor) {
         this.gcListener = gcListener;
-        this.compactor = new ClassicCompactor(writer, compactionMonitor);
+        this.compactor = compactor;
     }
 
     @Override
@@ -240,7 +235,7 @@ public class CheckpointCompactor extends Compactor {
     /**
      * Delegate compaction to another, usually simpler, implementation.
      */
-    protected @Nullable CompactedNodeState compactDownWithDelegate(
+    private @Nullable CompactedNodeState compactDownWithDelegate(
             @NotNull NodeState before,
             @NotNull NodeState after,
             @NotNull Canceller hardCanceller,
@@ -249,7 +244,7 @@ public class CheckpointCompactor extends Compactor {
         return compactor.compactDown(before, after, hardCanceller, 
softCanceller);
     }
 
-    protected @Nullable CompactedNodeState compactWithDelegate(
+    private @Nullable CompactedNodeState compactWithDelegate(
             @NotNull NodeState before,
             @NotNull NodeState after,
             @NotNull NodeState onto,
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
index 901e24aec0..326c6e209e 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
@@ -18,6 +18,7 @@
 package org.apache.jackrabbit.oak.segment;
 
 import static java.util.Objects.requireNonNull;
+import static java.util.Objects.requireNonNullElseGet;
 import static org.apache.jackrabbit.oak.api.Type.BINARIES;
 import static org.apache.jackrabbit.oak.api.Type.BINARY;
 import static 
org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState.binaryProperty;
@@ -97,16 +98,41 @@ public class ClassicCompactor extends Compactor {
             @NotNull NodeState onto,
             @NotNull Canceller canceller
     ) throws IOException {
-        return compact(before, after, onto, canceller, 
Canceller.newCanceller());
+        return compact(before, after, onto, canceller, null);
     }
 
-    private @Nullable CompactedNodeState compact(
-        @NotNull NodeState before,
-        @NotNull NodeState after,
-        @NotNull NodeState onto,
-        @NotNull Canceller hardCanceller,
-        @NotNull Canceller softCanceller
-    ) throws IOException {
+    /**
+     * Compact the differences between {@code after} and {@code before} on top 
of {@code onto}. The
+     * {@code softCanceller} must be null, unless {@code after.equals(onto)}, 
i.e. if the method is
+     * called for a {@link #compactDown(NodeState, NodeState, Canceller, 
Canceller)} scenario.
+     * .
+     *
+     * @param before        The node state used as the baseline for the diff.
+     * @param after         The node state used as the target for the diff.
+     * @param onto          The node state to apply the diff to.
+     * @param hardCanceller The trigger for hard cancellation, will abandon 
compaction if cancelled.
+     * @param softCanceller The trigger for soft cancellation, will return 
partially compacted state if cancelled.
+     *                      Must be null unless {@code after.equals(onto)}.
+     * @return              The compacted node state or {@code null} if 
hard-cancelled.
+     * @throws IOException Will throw exception if any errors occur during 
compaction.
+     */
+    protected @Nullable CompactedNodeState compact(
+            @NotNull NodeState before,
+            @NotNull NodeState after,
+            @NotNull NodeState onto,
+            @NotNull Canceller hardCanceller,
+            @Nullable Canceller softCanceller) throws IOException {
+        return internalCompact(before, after, onto, hardCanceller, 
softCanceller);
+    }
+
+    // the private method is necessary to prevent the call in 
CompactDiff#childNodeUpdated,
+    // to call an overridden #compact method instead of the one in this class.
+    private @Nullable CompactedNodeState internalCompact(
+            @NotNull NodeState before,
+            @NotNull NodeState after,
+            @NotNull NodeState onto,
+            @NotNull Canceller hardCanceller,
+            @Nullable Canceller softCanceller) throws IOException {
         CompactedNodeState compactedState = getPreviouslyCompactedState(after);
         if (compactedState == null) {
             compactedState = new CompactDiff(onto, hardCanceller, 
softCanceller).diff(before, after);
@@ -149,11 +175,11 @@ public class ClassicCompactor extends Compactor {
             }
         }
 
-        CompactDiff(@NotNull NodeState base, @NotNull Canceller hardCanceller, 
@NotNull Canceller softCanceller) {
+        CompactDiff(@NotNull NodeState base, @NotNull Canceller hardCanceller, 
@Nullable Canceller softCanceller) {
             this.base = requireNonNull(base);
             this.builder = new MemoryNodeBuilder(base);
             this.hardCanceller = requireNonNull(hardCanceller);
-            this.softCanceller = requireNonNull(softCanceller);
+            this.softCanceller = requireNonNullElseGet(softCanceller, 
Canceller::newCanceller);
         }
 
         private @NotNull CancelableDiff newCancelableDiff() {
@@ -200,7 +226,7 @@ public class ClassicCompactor extends Compactor {
             try {
                 NodeState child = base.getChildNode(name);
                 NodeState onto = child.exists() ? child : EMPTY_NODE;
-                CompactedNodeState compacted = compact(before, after, onto, 
hardCanceller, softCanceller);
+                CompactedNodeState compacted = internalCompact(before, after, 
onto, hardCanceller, softCanceller);
                 if (compacted == null) {
                     return false;
                 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
index 8fef492a99..0e4e4a0d55 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
@@ -23,8 +23,8 @@ import org.apache.jackrabbit.oak.commons.Buffer;
 import org.apache.jackrabbit.oak.commons.conditions.Validate;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder;
 import org.apache.jackrabbit.oak.segment.file.CompactedNodeState;
-import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
 import org.apache.jackrabbit.oak.segment.file.CompactionWriter;
+import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
 import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -55,7 +55,7 @@ import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE
  * Every node at this depth will be an entry point for asynchronous 
compaction. After the exploration phase,
  * the main thread will collect these compaction results and write their 
parents' node state to disk.
  */
-public class ParallelCompactor extends CheckpointCompactor {
+public class ParallelCompactor extends ClassicCompactor {
     /**
      * Expand repository tree until there are this many nodes for each worker 
to compact. Tradeoff
      * between inefficiency of many small tasks and high risk of at least one 
of the subtrees being
@@ -68,6 +68,8 @@ public class ParallelCompactor extends CheckpointCompactor {
      */
     private static final int EXPLORATION_UPPER_LIMIT = 100_000;
 
+    private final GCMonitor gcListener;
+
     private final int numWorkers;
 
     private final long totalSizeEstimate;
@@ -91,12 +93,13 @@ public class ParallelCompactor extends CheckpointCompactor {
             @NotNull CompactionWriter writer,
             @NotNull GCNodeWriteMonitor compactionMonitor,
             int nThreads) {
-        super(gcListener, writer, compactionMonitor);
+        super(writer, compactionMonitor);
         if (nThreads < 0) {
             nThreads += Runtime.getRuntime().availableProcessors() + 1;
         }
-        numWorkers = Math.max(0, nThreads - 1);
-        totalSizeEstimate = compactionMonitor.getEstimatedTotal();
+        this.gcListener = gcListener;
+        this.numWorkers = Math.max(0, nThreads - 1);
+        this.totalSizeEstimate = compactionMonitor.getEstimatedTotal();
     }
 
     /**
@@ -131,7 +134,7 @@ public class ParallelCompactor extends CheckpointCompactor {
             }
 
             @NotNull PropertyState compact() {
-                return compactor.compact(state);
+                return ParallelCompactor.this.compact(state);
             }
         }
 
@@ -142,7 +145,7 @@ public class ParallelCompactor extends CheckpointCompactor {
 
         @Nullable List<Entry<String, CompactionTree>> expand(@NotNull 
Canceller hardCanceller) {
             Validate.checkState(compactionFuture == null);
-            CompactedNodeState compactedState = 
compactor.getPreviouslyCompactedState(after);
+            CompactedNodeState compactedState = 
getPreviouslyCompactedState(after);
             if (compactedState != null) {
                 compactionFuture = 
CompletableFuture.completedFuture(compactedState);
                 return Collections.emptyList();
@@ -203,14 +206,9 @@ public class ParallelCompactor extends CheckpointCompactor 
{
         void compactAsync(@NotNull Canceller hardCanceller, @Nullable 
Canceller softCanceller) {
             if (compactionFuture == null) {
                 requireNonNull(executorService);
-                if (softCanceller == null) {
-                    compactionFuture = executorService.submit(() ->
-                            compactor.compact(before, after, onto, 
hardCanceller));
-                } else {
-                    Validate.checkState(onto.equals(after));
-                    compactionFuture = executorService.submit(() ->
-                            compactor.compactDown(before, after, 
hardCanceller, softCanceller));
-                }
+                Validate.checkState(softCanceller == null || 
onto.equals(after));
+                compactionFuture = executorService.submit(() ->
+                        ParallelCompactor.super.compact(before, after, onto, 
hardCanceller, softCanceller));
             }
         }
 
@@ -267,7 +265,7 @@ public class ParallelCompactor extends CheckpointCompactor {
                             builder.setChildNode(entry.getKey(), 
compactedState);
                         }
                     }
-                    return compactor.writeNodeState(builder.getNodeState(), 
stableIdBytes, false);
+                    return writeNodeState(builder.getNodeState(), 
stableIdBytes, false);
                 }
             }
 
@@ -283,7 +281,7 @@ public class ParallelCompactor extends CheckpointCompactor {
                 builder.removeProperty(name);
             }
 
-            return compactor.writeNodeState(builder.getNodeState(), 
stableIdBytes, true);
+            return writeNodeState(builder.getNodeState(), stableIdBytes, true);
         }
     }
 
@@ -296,13 +294,7 @@ public class ParallelCompactor extends CheckpointCompactor 
{
         private final @NotNull Canceller hardCanceller;
         private final @Nullable Canceller softCanceller;
 
-        CompactionHandler(@NotNull NodeState base, @NotNull Canceller 
hardCanceller) {
-            this.base = base;
-            this.hardCanceller = hardCanceller;
-            this.softCanceller = null;
-        }
-
-        CompactionHandler(@NotNull NodeState base, @NotNull Canceller 
hardCanceller, @NotNull Canceller softCanceller) {
+        CompactionHandler(@NotNull NodeState base, @NotNull Canceller 
hardCanceller, @Nullable Canceller softCanceller) {
             this.base = base;
             this.hardCanceller = hardCanceller;
             this.softCanceller = softCanceller;
@@ -385,30 +377,11 @@ public class ParallelCompactor extends 
CheckpointCompactor {
     }
 
     @Override
-    protected @Nullable CompactedNodeState compactDownWithDelegate(
-            @NotNull NodeState before,
-            @NotNull NodeState after,
-            @NotNull Canceller hardCanceller,
-            @NotNull Canceller softCanceller
-    ) throws IOException {
-        if (initializeExecutor()) {
-            return new CompactionHandler(after, hardCanceller, 
softCanceller).diff(before, after);
-        } else {
-            return super.compactDownWithDelegate(before, after, hardCanceller, 
softCanceller);
-        }
-    }
-
-    @Override
-    protected @Nullable CompactedNodeState compactWithDelegate(
-            @NotNull NodeState before,
-            @NotNull NodeState after,
-            @NotNull NodeState onto,
-            @NotNull Canceller canceller
-    ) throws IOException {
+    protected @Nullable CompactedNodeState compact(@NotNull NodeState before, 
@NotNull NodeState after, @NotNull NodeState onto, @NotNull Canceller 
hardCanceller, @Nullable Canceller softCanceller) throws IOException {
         if (initializeExecutor()) {
-            return new CompactionHandler(onto, canceller).diff(before, after);
+            return new CompactionHandler(onto, hardCanceller, 
softCanceller).diff(before, after);
         } else {
-            return super.compactWithDelegate(before, after, onto, canceller);
+            return super.compact(before, after, onto, hardCanceller, 
softCanceller);
         }
     }
 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
index f368f8ec2a..19df646df4 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
@@ -33,7 +33,6 @@ import org.apache.jackrabbit.oak.segment.Compactor;
 import org.apache.jackrabbit.oak.segment.ParallelCompactor;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.segment.SegmentWriter;
 import 
org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType;
 import org.apache.jackrabbit.oak.segment.file.cancel.Cancellation;
@@ -295,10 +294,12 @@ abstract class AbstractCompactionStrategy implements 
CompactionStrategy {
         CompactorType compactorType = 
context.getGCOptions().getCompactorType();
         switch (compactorType) {
             case PARALLEL_COMPACTOR:
-                return new ParallelCompactor(context.getGCListener(), writer, 
context.getCompactionMonitor(),
-                        context.getGCOptions().getConcurrency());
+                return new CheckpointCompactor(context.getGCListener(),
+                        new ParallelCompactor(context.getGCListener(), writer, 
context.getCompactionMonitor(),
+                                context.getGCOptions().getConcurrency()));
             case CHECKPOINT_COMPACTOR:
-                return new CheckpointCompactor(context.getGCListener(), 
writer, context.getCompactionMonitor());
+                return new CheckpointCompactor(context.getGCListener(),
+                        new ClassicCompactor(writer, 
context.getCompactionMonitor()));
             case CLASSIC_COMPACTOR:
                 return new ClassicCompactor(writer, 
context.getCompactionMonitor());
             default:
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
index 0bdfb39a5d..955eaa2952 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
@@ -40,6 +40,6 @@ public class CheckpointCompactorExternalBlobTest extends 
AbstractCompactorExtern
     protected CheckpointCompactor createCompactor(@NotNull FileStore 
fileStore, @NotNull GCGeneration generation) {
         SegmentWriter writer = 
defaultSegmentWriterBuilder("c").withGeneration(generation).build(fileStore);
         CompactionWriter compactionWriter = new 
CompactionWriter(fileStore.getReader(), fileStore.getBlobStore(), generation, 
writer);
-        return new CheckpointCompactor(GCMonitor.EMPTY, compactionWriter, 
GCNodeWriteMonitor.EMPTY);
+        return new CheckpointCompactor(GCMonitor.EMPTY, new 
ClassicCompactor(compactionWriter, GCNodeWriteMonitor.EMPTY));
     }
 }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
index 4c4150a0ce..8f9ad60c2b 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
@@ -46,6 +46,6 @@ public class CheckpointCompactorTest extends 
AbstractCompactorTest {
                 .withGeneration(generation)
                 .build(fileStore);
         CompactionWriter compactionWriter = new 
CompactionWriter(fileStore.getReader(), fileStore.getBlobStore(), increment, 
writerFactory);
-        return new CheckpointCompactor(GCMonitor.EMPTY, compactionWriter, 
compactionMonitor);
+        return new CheckpointCompactor(GCMonitor.EMPTY, new 
ClassicCompactor(compactionWriter, GCNodeWriteMonitor.EMPTY));
     }
 }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
index 210dd60c61..06a26bb3f9 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
@@ -57,12 +57,13 @@ public class ParallelCompactorExternalBlobTest extends 
AbstractCompactorExternal
     }
 
     @Override
-    protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, 
@NotNull GCGeneration generation) {
+    protected Compactor createCompactor(@NotNull FileStore fileStore, @NotNull 
GCGeneration generation) {
         SegmentWriter writer = defaultSegmentWriterBuilder("c")
                 .withGeneration(generation)
                 
.withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC)
                 .build(fileStore);
         CompactionWriter compactionWriter = new 
CompactionWriter(fileStore.getReader(), fileStore.getBlobStore(), generation, 
writer);
-        return new ParallelCompactor(GCMonitor.EMPTY, compactionWriter, 
GCNodeWriteMonitor.EMPTY, concurrency);
+        return new CheckpointCompactor(GCMonitor.EMPTY,
+                new ParallelCompactor(GCMonitor.EMPTY, compactionWriter, 
GCNodeWriteMonitor.EMPTY, concurrency));
     }
 }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
index 2855516ab7..f4e4d81ae8 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
@@ -57,7 +57,7 @@ public class ParallelCompactorTest extends 
AbstractCompactorTest {
     }
 
     @Override
-    protected ParallelCompactor createCompactor(
+    protected Compactor createCompactor(
             @NotNull FileStore fileStore,
             @NotNull GCIncrement increment,
             @NotNull GCNodeWriteMonitor compactionMonitor
@@ -67,6 +67,7 @@ public class ParallelCompactorTest extends 
AbstractCompactorTest {
                 
.withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC)
                 .build(fileStore);
         CompactionWriter compactionWriter = new 
CompactionWriter(fileStore.getReader(), fileStore.getBlobStore(), increment, 
writerFactory);
-        return new ParallelCompactor(GCMonitor.EMPTY, compactionWriter, 
compactionMonitor, concurrency);
+        return new CheckpointCompactor(GCMonitor.EMPTY,
+                new ParallelCompactor(GCMonitor.EMPTY, compactionWriter, 
compactionMonitor, concurrency));
     }
 }

Reply via email to