This is an automated email from the ASF dual-hosted git repository.
jsedding pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new d7fabe36cc OAK-12069 - reduce SegmentPreloader memory usage (#2696)
d7fabe36cc is described below
commit d7fabe36cc5be7510c1984d6fa0d230e88fb3b73
Author: Julian Sedding <[email protected]>
AuthorDate: Fri Jan 23 14:58:36 2026 +0100
OAK-12069 - reduce SegmentPreloader memory usage (#2696)
- remove segment-graph cache
- read segment references from segment data
---
.../oak/segment/file/preloader/SegmentIds.java | 64 +++++++
.../segment/file/preloader/SegmentPreloader.java | 185 ++++++++++++++-------
.../file/preloader/SegmentPreloaderTest.java | 37 +++--
3 files changed, 210 insertions(+), 76 deletions(-)
diff --git
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentIds.java
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentIds.java
new file mode 100644
index 0000000000..61254bb9ca
--- /dev/null
+++
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentIds.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.file.preloader;
+
+/**
+ * A simple, memory-efficient data structure to hold segment IDs (UUIDs) as
pairs of long values
+ * (most significant bits and least significant bits).
+ */
+final class SegmentIds {
+
+ public static final SegmentIds EMPTY = new SegmentIds(0);
+
+ private final long[] segmentIds;
+
+ SegmentIds(int size) {
+ this.segmentIds = new long[size * 2];
+ }
+
+ public void add(int index, long msb, long lsb) {
+ checkIndexWithinBounds(index);
+ segmentIds[index * 2] = msb;
+ segmentIds[index * 2 + 1] = lsb;
+ }
+
+ public long getMsb(int index) {
+ checkIndexWithinBounds(index);
+ return segmentIds[index * 2];
+ }
+
+ public long getLsb(int index) {
+ checkIndexWithinBounds(index);
+ return segmentIds[index * 2 + 1];
+ }
+
+ public int size() {
+ return segmentIds.length / 2;
+ }
+
+ public boolean isEmpty() {
+ return segmentIds.length == 0;
+ }
+
+ private void checkIndexWithinBounds(int index) {
+ if (index < 0 || segmentIds.length / 2 <= index) {
+ throw new IndexOutOfBoundsException("Index: " + index + ", Size: "
+ (segmentIds.length / 2));
+ }
+ }
+}
diff --git
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java
index 0a136710fc..49e0f84cb6 100644
---
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java
+++
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java
@@ -21,22 +21,21 @@ package org.apache.jackrabbit.oak.segment.file.preloader;
import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.commons.internal.function.Suppliers;
import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.data.SegmentData;
import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
import
org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.DelegatingPersistentCache;
import
org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
import
org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -60,9 +59,15 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
private static final Logger LOG =
LoggerFactory.getLogger(SegmentPreloader.class);
- private final Map<Integer, String> inProgressPrefetch;
+ private static final Thread.UncaughtExceptionHandler
UNCAUGHT_EXCEPTION_HANDLER = (t, e) -> {
+ if (!(e instanceof InterruptedException)) {
+ LOG.warn("Uncaught exception in thread {} ({}, {})", t.getName(),
e.getClass(), e.getMessage(), e);
+ }
+ };
+
+ private static final int DISPATCH_QUEUE_MAX_SIZE = 10_000;
- private final ConcurrentHashMap<String, Map<UUID, Set<UUID>>> graphCache;
+ private final Map<Integer, String> inProgressPrefetch;
private final PersistentCache delegate;
@@ -70,10 +75,12 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
private final ExecutorService preloadPool;
- private final int preloadDepth;
-
private final Supplier<TarFiles> tarFiles;
+ private final int maxPreloadDepth;
+
+ private volatile int preloadDepth;
+
/**
* Factory method that decorates the given {@link PersistentCache} with a
* {@link SegmentPreloader} if the given configuration requires preloading.
@@ -95,18 +102,45 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
this.delegate = delegate;
this.tarFiles = Suppliers.memoize(tarFiles);
this.inProgressPrefetch = new ConcurrentHashMap<>();
- this.graphCache = new ConcurrentHashMap<>();
- this.preloadDepth = config.getMaxPreloadDepth();
+ this.maxPreloadDepth = config.getMaxPreloadDepth();
+ this.preloadDepth = adaptPreloadDepth(this.maxPreloadDepth, 0);
this.dispatchPool = new ThreadPoolExecutor(1,1,
1, TimeUnit.SECONDS,
new PriorityBlockingQueue<>(),
- r -> new Thread(r, "segment-preload-dispatcher")) {
+ r -> {
+ Thread thread = new Thread(r,
"segment-preload-dispatcher");
+
thread.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER);
+ return thread;
+ }) {
+
+ private volatile long lastLoggedTime = System.currentTimeMillis();
+
+ @Override
+ public void execute(@NotNull Runnable command) {
+ if (getQueue().size() < DISPATCH_QUEUE_MAX_SIZE) {
+ super.execute(command);
+ }
+ }
+
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
clearInProgressTask(r);
+ int size = getQueue().size();
+ int adaptedPreloadDepth = adaptPreloadDepth(maxPreloadDepth,
size / (double) DISPATCH_QUEUE_MAX_SIZE);
+ if (adaptedPreloadDepth != preloadDepth) {
+ preloadDepth = adaptedPreloadDepth;
+ LOG.debug("Adjusted preload depth to {} (queue size: {})",
preloadDepth, size);
+ }
+
+ long now = System.currentTimeMillis();
+ if (lastLoggedTime + 15_000 < now) {
+ lastLoggedTime = now;
+ LOG.info("Dispatch pool queue size: {}, current preload
depth: {}", size, preloadDepth);
+ }
}
};
+
int preloadThreads = config.getConcurrency();
ThreadPoolExecutor preloadPool = new ThreadPoolExecutor(Math.max(1,
preloadThreads / 4), preloadThreads,
5, TimeUnit.SECONDS,
@@ -114,11 +148,8 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
r -> {
String threadName = String.format("segment-preload-%s",
Long.toHexString(System.nanoTime() & 0xFFFFF));
Thread thread = new Thread(r, threadName);
- thread.setUncaughtExceptionHandler((t, e) -> {
- if (!(e instanceof InterruptedException)) {
- LOG.warn("Uncaught exception in thread {}",
t.getName(), e);
- }
- });
+ thread.setPriority(Thread.MIN_PRIORITY);
+
thread.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER);
return thread;
},
(r, executor) -> {
@@ -141,6 +172,18 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
this.preloadPool = preloadPool;
}
+ @VisibleForTesting
+ static int adaptPreloadDepth(int maxPreloadDepth, double
queueFillPercentage) {
+ double remainingCapacity = 1.0 - queueFillPercentage;
+ double capacitySlice = 1.0 / maxPreloadDepth;
+ for (int i = 1; i < maxPreloadDepth; i++) {
+ if (remainingCapacity <= i * capacitySlice) {
+ return i;
+ }
+ }
+ return maxPreloadDepth;
+ }
+
@Override
protected PersistentCache delegate() {
return delegate;
@@ -148,21 +191,19 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
@Override
public @Nullable Buffer readSegment(long msb, long lsb, @NotNull
Callable<Buffer> loader) {
- dispatch(msb, lsb);
- return delegate().readSegment(msb, lsb, loader);
- }
-
- private void dispatch(long msb, long lsb) {
- dispatch(msb, lsb, 1);
+ Buffer buffer = super.readSegment(msb, lsb, loader);
+ dispatch(msb, lsb, getReferencedSegments(lsb, buffer), 1);
+ return buffer;
}
- private void dispatch(long msb, long lsb, int depth) {
- execute(dispatchPool, createDispatchTask(msb, lsb, depth));
+ private void dispatch(long msb, long lsb, SegmentIds referencedSegments,
int depth) {
+ if (depth <= preloadDepth && !referencedSegments.isEmpty() &&
SegmentId.isDataSegmentId(lsb)) {
+ execute(dispatchPool, createDispatchTask(msb, lsb,
referencedSegments, depth));
+ }
}
- @NotNull SegmentPreloader.DispatchTask createDispatchTask(long msb, long
lsb, int depth) {
- TarFiles tars = tarFiles.get();
- return new DispatchTask(tars, tars::getIndices, msb, lsb, depth);
+ @NotNull DispatchTask createDispatchTask(long msb, long lsb, SegmentIds
referencedSegments, int depth) {
+ return new DispatchTask(msb, lsb, referencedSegments, depth);
}
private void preload(long msb, long lsb, int depth) {
@@ -205,26 +246,34 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
}
}
- class DispatchTask implements Runnable, Comparable<DispatchTask> {
-
- private final TarFiles tarFiles;
+ private boolean wrapsInterruptedException(RuntimeException e) {
+ Throwable candidate = e;
+ while (candidate.getCause() != null) {
+ if (candidate.getCause() instanceof InterruptedException) {
+ return true;
+ }
+ candidate = candidate.getCause();
+ }
+ return false;
+ }
- private final Supplier<Map<String, Set<UUID>>> indicesSupplier;
+ class DispatchTask implements Runnable, Comparable<DispatchTask> {
private final long msb;
private final long lsb;
+ private final SegmentIds references;
+
private final int depth;
private final long creationTime = System.nanoTime();
- private DispatchTask(@NotNull TarFiles tarFiles, Supplier<Map<String,
Set<UUID>>> indicesSupplier, long msb, long lsb, int depth) {
- checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d",
preloadDepth, depth);
- this.tarFiles = tarFiles;
- this.indicesSupplier = indicesSupplier;
+ private DispatchTask(long msb, long lsb, SegmentIds references, int
depth) {
+ checkArgument(depth <= maxPreloadDepth, "depth must be <= %d, is
%d", maxPreloadDepth, depth);
this.msb = msb;
this.lsb = lsb;
+ this.references = references;
this.depth = depth;
LOG.debug("Created: {}", this);
}
@@ -232,30 +281,19 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
@Override
public void run() {
LOG.debug("Running: {}", this);
- UUID uuid = new UUID(msb, lsb);
- Map<String, Set<UUID>> indices = indicesSupplier.get();
- String archiveName = indices.entrySet().stream()
- .filter(entry -> entry.getValue().contains(uuid))
- .findFirst()
- .map(Map.Entry::getKey)
- .orElse(null);
-
- Map<UUID, Set<UUID>> graph =
graphCache.computeIfAbsent(archiveName, name -> {
- try {
- return tarFiles.getGraph(name);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
- for (UUID reference : graph.get(uuid)) {
- long refMsb = reference.getMostSignificantBits();
- long refLsb = reference.getLeastSignificantBits();
- if (!delegate.containsSegment(refMsb, refLsb)) {
+ try {
+ for (int i = 0; i < references.size(); i++) {
+ long refMsb = references.getMsb(i);
+ long refLsb = references.getLsb(i);
preload(refMsb, refLsb, depth);
- } else if (depth < preloadDepth &&
SegmentId.isDataSegmentId(refLsb)) {
- dispatch(refMsb, refLsb, depth + 1);
}
+ } catch (RuntimeException e) {
+ if (wrapsInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ throw e;
}
}
@@ -278,7 +316,7 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
@Override
public String toString() {
- return "DispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth="
+ depth + '}';
+ return "DispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth="
+ depth + ", references=" + references.size() + '}';
}
private int getPreloadDepth() {
@@ -298,6 +336,22 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
}
}
+ private static SegmentIds getReferencedSegments(long lsb, @Nullable Buffer
buffer) {
+ if (buffer == null || !SegmentId.isDataSegmentId(lsb)) {
+ return SegmentIds.EMPTY;
+ }
+ SegmentData segmentData = SegmentData.newSegmentData(buffer);
+ int referencedSegmentsCount = segmentData.getSegmentReferencesCount();
+ if (referencedSegmentsCount == 0) {
+ return SegmentIds.EMPTY;
+ }
+ SegmentIds segmentIds = new SegmentIds(referencedSegmentsCount);
+ for (int i = 0; i < referencedSegmentsCount; i++) {
+ segmentIds.add(i, segmentData.getSegmentReferenceMsb(i),
segmentData.getSegmentReferenceLsb(i));
+ }
+ return segmentIds;
+ }
+
class PreloadTask implements Runnable {
private final TarFiles tarFiles;
@@ -309,7 +363,7 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
private final int depth;
private PreloadTask(TarFiles tarFiles, long msb, long lsb, int depth) {
- checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d",
preloadDepth, depth);
+ checkArgument(depth <= maxPreloadDepth, "depth must be <= %d, is
%d", maxPreloadDepth, depth);
this.tarFiles = tarFiles;
this.msb = msb;
this.lsb = lsb;
@@ -320,14 +374,17 @@ public class SegmentPreloader extends
DelegatingPersistentCache implements Close
@Override
public void run() {
LOG.debug("Running: {}", this);
- if (depth < preloadDepth && SegmentId.isDataSegmentId(lsb)) {
- dispatch(msb, lsb, depth + 1);
- }
- if (!delegate.containsSegment(msb, lsb)) {
- Buffer segmentBuffer = tarFiles.readSegment(msb, lsb);
- if (segmentBuffer != null) {
- delegate.writeSegment(msb, lsb, segmentBuffer);
+ try {
+ if (depth < preloadDepth || !containsSegment(msb, lsb)) {
+ Buffer buffer = delegate().readSegment(msb, lsb, () ->
tarFiles.readSegment(msb, lsb));
+ dispatch(msb, lsb, getReferencedSegments(lsb, buffer),
depth + 1);
+ }
+ } catch (RuntimeException e) {
+ if (wrapsInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ return;
}
+ throw e;
}
}
diff --git
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java
index 9be8947db9..37c887ac48 100644
---
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java
+++
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java
@@ -184,22 +184,22 @@ public class SegmentPreloaderTest {
long msb = uuid.getMostSignificantBits();
long lsb = uuid.getLeastSignificantBits();
- DispatchTask task1 = preloader.createDispatchTask(msb, lsb, 1);
+ DispatchTask task1 = createDispatchTask(preloader, msb, lsb, 1);
assertEquals(task1, task1);
- DispatchTask task2 = preloader.createDispatchTask(msb, lsb, 1);
+ DispatchTask task2 = createDispatchTask(preloader, msb, lsb, 1);
assertEquals(task1, task2);
- DispatchTask task3 = preloader.createDispatchTask(msb, lsb, 0);
+ DispatchTask task3 = createDispatchTask(preloader, msb, lsb, 0);
assertNotEquals(task1, task3);
- DispatchTask task4 = preloader.createDispatchTask(msb, lsb + 1, 1);
+ DispatchTask task4 = createDispatchTask(preloader, msb, lsb + 1,
1);
assertNotEquals(task1, task4);
- DispatchTask task5 = preloader.createDispatchTask(msb + 1, lsb, 1);
+ DispatchTask task5 = createDispatchTask(preloader, msb + 1, lsb,
1);
assertNotEquals(task1, task5);
- assertNotEquals(task1, new Object());
+ assertNotEquals(new Object(), task1);
});
}
@@ -207,7 +207,7 @@ public class SegmentPreloaderTest {
public void testDispatchTaskArgumentValidation() throws IOException {
withSegmentPreloader(preloader -> {
UUID uuid = UUID.randomUUID();
- assertThrows(IllegalArgumentException.class, () ->
preloader.createDispatchTask(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits(), 3));
+ assertThrows(IllegalArgumentException.class, () ->
createDispatchTask(preloader, uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits(), 3));
});
}
@@ -216,8 +216,8 @@ public class SegmentPreloaderTest {
withSegmentPreloader(preloader -> {
UUID uuid = UUID.randomUUID();
assertEquals(
- "DispatchTask{segmentId=" + uuid + ", depth=1}",
-
preloader.createDispatchTask(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits(), 1).toString());
+ "DispatchTask{segmentId=" + uuid + ", depth=1,
references=0}",
+ createDispatchTask(preloader,
uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1).toString());
});
}
@@ -225,9 +225,9 @@ public class SegmentPreloaderTest {
public void testDispatchTaskCompareTo() throws IOException {
withSegmentPreloader(preloader -> {
UUID uuid = UUID.randomUUID();
- DispatchTask task1 =
preloader.createDispatchTask(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits(), 2);
- DispatchTask task2 =
preloader.createDispatchTask(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits(), 1);
- DispatchTask task3 =
preloader.createDispatchTask(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits(), 2);
+ DispatchTask task1 = createDispatchTask(preloader,
uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2);
+ DispatchTask task2 = createDispatchTask(preloader,
uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1);
+ DispatchTask task3 = createDispatchTask(preloader,
uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2);
List<DispatchTask> tasks = new ArrayList<>();
tasks.add(task1);
tasks.add(task2);
@@ -273,6 +273,15 @@ public class SegmentPreloaderTest {
});
}
+ @Test
+ public void testAdaptPreloadDepth() {
+ assertEquals(4, SegmentPreloader.adaptPreloadDepth(4, 0.0));
+ assertEquals(3, SegmentPreloader.adaptPreloadDepth(4, 0.25));
+ assertEquals(2, SegmentPreloader.adaptPreloadDepth(4, 0.5));
+ assertEquals(1, SegmentPreloader.adaptPreloadDepth(4, 0.75));
+ assertEquals(1, SegmentPreloader.adaptPreloadDepth(4, 1.0));
+ }
+
private void withSegmentPreloader(Consumer<SegmentPreloader>
withPreloader) throws IOException {
MemoryTestCache cache = new MemoryTestCache();
PersistentCachePreloadingConfiguration config =
@@ -289,6 +298,10 @@ public class SegmentPreloaderTest {
return UUID.fromString(parts[0]);
}
+ private static @NotNull DispatchTask createDispatchTask(SegmentPreloader
preloader, long msb, long lsb, int depth) {
+ return preloader.createDispatchTask(msb, lsb, SegmentIds.EMPTY, depth);
+ }
+
private void assertReferencedSegmentsLoaded(Set<UUID> referencedSegments,
MemoryTestCache underlyingCache, SegmentPreloader preloadingCache) {
Set<UUID> segments = new HashSet<>(referencedSegments);
int timeoutSec = 10;