This is an automated email from the ASF dual-hosted git repository. jsedding pushed a commit to branch jsedding/OAK-12069-reduce-SegmentPreloader-memory-usage in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit bc622d1ce80638c9abe7ea232afd980bb58625e0 Author: Julian Sedding <[email protected]> AuthorDate: Wed Jan 21 13:26:46 2026 +0100 OAK-12069 - reduce SegmentPreloader memory usage - remove segment-graph cache - read segment references from segment data --- .../oak/segment/file/preloader/SegmentIds.java | 64 +++++++ .../segment/file/preloader/SegmentPreloader.java | 185 ++++++++++++++------- .../file/preloader/SegmentPreloaderTest.java | 37 +++-- 3 files changed, 210 insertions(+), 76 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentIds.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentIds.java new file mode 100644 index 0000000000..61254bb9ca --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentIds.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.preloader; + +/** + * A simple, memory-efficient data structure to hold segment IDs (UUIDs) as pairs of long values + * (most significant bits and least significant bits). + */ +final class SegmentIds { + + public static final SegmentIds EMPTY = new SegmentIds(0); + + private final long[] segmentIds; + + SegmentIds(int size) { + this.segmentIds = new long[size * 2]; + } + + public void add(int index, long msb, long lsb) { + checkIndexWithinBounds(index); + segmentIds[index * 2] = msb; + segmentIds[index * 2 + 1] = lsb; + } + + public long getMsb(int index) { + checkIndexWithinBounds(index); + return segmentIds[index * 2]; + } + + public long getLsb(int index) { + checkIndexWithinBounds(index); + return segmentIds[index * 2 + 1]; + } + + public int size() { + return segmentIds.length / 2; + } + + public boolean isEmpty() { + return segmentIds.length == 0; + } + + private void checkIndexWithinBounds(int index) { + if (index < 0 || segmentIds.length / 2 <= index) { + throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + (segmentIds.length / 2)); + } + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java index 0a136710fc..49e0f84cb6 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java @@ -21,22 +21,21 @@ package org.apache.jackrabbit.oak.segment.file.preloader; import org.apache.jackrabbit.oak.commons.Buffer; import org.apache.jackrabbit.oak.commons.internal.function.Suppliers; import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.data.SegmentData; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.DelegatingPersistentCache; import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.Comparator; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -60,9 +59,15 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close private static final Logger LOG = LoggerFactory.getLogger(SegmentPreloader.class); - private final Map<Integer, String> inProgressPrefetch; + private static final Thread.UncaughtExceptionHandler UNCAUGHT_EXCEPTION_HANDLER = (t, e) -> { + if (!(e instanceof InterruptedException)) { + LOG.warn("Uncaught exception in thread {} ({}, {})", t.getName(), e.getClass(), e.getMessage(), e); + } + }; + + private static final int DISPATCH_QUEUE_MAX_SIZE = 10_000; - private final ConcurrentHashMap<String, Map<UUID, Set<UUID>>> graphCache; + private final Map<Integer, String> inProgressPrefetch; private final PersistentCache delegate; @@ -70,10 +75,12 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close private final ExecutorService preloadPool; - private final int preloadDepth; - private final Supplier<TarFiles> tarFiles; + private final int maxPreloadDepth; + + private volatile int preloadDepth; + /** * Factory method that decorates the given {@link PersistentCache} with a * {@link SegmentPreloader} if the given configuration requires preloading. @@ -95,18 +102,45 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close this.delegate = delegate; this.tarFiles = Suppliers.memoize(tarFiles); this.inProgressPrefetch = new ConcurrentHashMap<>(); - this.graphCache = new ConcurrentHashMap<>(); - this.preloadDepth = config.getMaxPreloadDepth(); + this.maxPreloadDepth = config.getMaxPreloadDepth(); + this.preloadDepth = adaptPreloadDepth(this.maxPreloadDepth, 0); this.dispatchPool = new ThreadPoolExecutor(1,1, 1, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), - r -> new Thread(r, "segment-preload-dispatcher")) { + r -> { + Thread thread = new Thread(r, "segment-preload-dispatcher"); + thread.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER); + return thread; + }) { + + private volatile long lastLoggedTime = System.currentTimeMillis(); + + @Override + public void execute(@NotNull Runnable command) { + if (getQueue().size() < DISPATCH_QUEUE_MAX_SIZE) { + super.execute(command); + } + } + @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); clearInProgressTask(r); + int size = getQueue().size(); + int adaptedPreloadDepth = adaptPreloadDepth(maxPreloadDepth, size / (double) DISPATCH_QUEUE_MAX_SIZE); + if (adaptedPreloadDepth != preloadDepth) { + preloadDepth = adaptedPreloadDepth; + LOG.debug("Adjusted preload depth to {} (queue size: {})", preloadDepth, size); + } + + long now = System.currentTimeMillis(); + if (lastLoggedTime + 15_000 < now) { + lastLoggedTime = now; + LOG.info("Dispatch pool queue size: {}, current preload depth: {}", size, preloadDepth); + } } }; + int preloadThreads = config.getConcurrency(); ThreadPoolExecutor preloadPool = new ThreadPoolExecutor(Math.max(1, preloadThreads / 4), preloadThreads, 5, TimeUnit.SECONDS, @@ -114,11 +148,8 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close r -> { String threadName = String.format("segment-preload-%s", Long.toHexString(System.nanoTime() & 0xFFFFF)); Thread thread = new Thread(r, threadName); - thread.setUncaughtExceptionHandler((t, e) -> { - if (!(e instanceof InterruptedException)) { - LOG.warn("Uncaught exception in thread {}", t.getName(), e); - } - }); + thread.setPriority(Thread.MIN_PRIORITY); + thread.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER); return thread; }, (r, executor) -> { @@ -141,6 +172,18 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close this.preloadPool = preloadPool; } + @VisibleForTesting + static int adaptPreloadDepth(int maxPreloadDepth, double queueFillPercentage) { + double remainingCapacity = 1.0 - queueFillPercentage; + double capacitySlice = 1.0 / maxPreloadDepth; + for (int i = 1; i < maxPreloadDepth; i++) { + if (remainingCapacity <= i * capacitySlice) { + return i; + } + } + return maxPreloadDepth; + } + @Override protected PersistentCache delegate() { return delegate; @@ -148,21 +191,19 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close @Override public @Nullable Buffer readSegment(long msb, long lsb, @NotNull Callable<Buffer> loader) { - dispatch(msb, lsb); - return delegate().readSegment(msb, lsb, loader); - } - - private void dispatch(long msb, long lsb) { - dispatch(msb, lsb, 1); + Buffer buffer = super.readSegment(msb, lsb, loader); + dispatch(msb, lsb, getReferencedSegments(lsb, buffer), 1); + return buffer; } - private void dispatch(long msb, long lsb, int depth) { - execute(dispatchPool, createDispatchTask(msb, lsb, depth)); + private void dispatch(long msb, long lsb, SegmentIds referencedSegments, int depth) { + if (depth <= preloadDepth && !referencedSegments.isEmpty() && SegmentId.isDataSegmentId(lsb)) { + execute(dispatchPool, createDispatchTask(msb, lsb, referencedSegments, depth)); + } } - @NotNull SegmentPreloader.DispatchTask createDispatchTask(long msb, long lsb, int depth) { - TarFiles tars = tarFiles.get(); - return new DispatchTask(tars, tars::getIndices, msb, lsb, depth); + @NotNull DispatchTask createDispatchTask(long msb, long lsb, SegmentIds referencedSegments, int depth) { + return new DispatchTask(msb, lsb, referencedSegments, depth); } private void preload(long msb, long lsb, int depth) { @@ -205,26 +246,34 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close } } - class DispatchTask implements Runnable, Comparable<DispatchTask> { - - private final TarFiles tarFiles; + private boolean wrapsInterruptedException(RuntimeException e) { + Throwable candidate = e; + while (candidate.getCause() != null) { + if (candidate.getCause() instanceof InterruptedException) { + return true; + } + candidate = candidate.getCause(); + } + return false; + } - private final Supplier<Map<String, Set<UUID>>> indicesSupplier; + class DispatchTask implements Runnable, Comparable<DispatchTask> { private final long msb; private final long lsb; + private final SegmentIds references; + private final int depth; private final long creationTime = System.nanoTime(); - private DispatchTask(@NotNull TarFiles tarFiles, Supplier<Map<String, Set<UUID>>> indicesSupplier, long msb, long lsb, int depth) { - checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d", preloadDepth, depth); - this.tarFiles = tarFiles; - this.indicesSupplier = indicesSupplier; + private DispatchTask(long msb, long lsb, SegmentIds references, int depth) { + checkArgument(depth <= maxPreloadDepth, "depth must be <= %d, is %d", maxPreloadDepth, depth); this.msb = msb; this.lsb = lsb; + this.references = references; this.depth = depth; LOG.debug("Created: {}", this); } @@ -232,30 +281,19 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close @Override public void run() { LOG.debug("Running: {}", this); - UUID uuid = new UUID(msb, lsb); - Map<String, Set<UUID>> indices = indicesSupplier.get(); - String archiveName = indices.entrySet().stream() - .filter(entry -> entry.getValue().contains(uuid)) - .findFirst() - .map(Map.Entry::getKey) - .orElse(null); - - Map<UUID, Set<UUID>> graph = graphCache.computeIfAbsent(archiveName, name -> { - try { - return tarFiles.getGraph(name); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - for (UUID reference : graph.get(uuid)) { - long refMsb = reference.getMostSignificantBits(); - long refLsb = reference.getLeastSignificantBits(); - if (!delegate.containsSegment(refMsb, refLsb)) { + try { + for (int i = 0; i < references.size(); i++) { + long refMsb = references.getMsb(i); + long refLsb = references.getLsb(i); preload(refMsb, refLsb, depth); - } else if (depth < preloadDepth && SegmentId.isDataSegmentId(refLsb)) { - dispatch(refMsb, refLsb, depth + 1); } + } catch (RuntimeException e) { + if (wrapsInterruptedException(e)) { + Thread.currentThread().interrupt(); + return; + } + throw e; } } @@ -278,7 +316,7 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close @Override public String toString() { - return "DispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + '}'; + return "DispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + ", references=" + references.size() + '}'; } private int getPreloadDepth() { @@ -298,6 +336,22 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close } } + private static SegmentIds getReferencedSegments(long lsb, @Nullable Buffer buffer) { + if (buffer == null || !SegmentId.isDataSegmentId(lsb)) { + return SegmentIds.EMPTY; + } + SegmentData segmentData = SegmentData.newSegmentData(buffer); + int referencedSegmentsCount = segmentData.getSegmentReferencesCount(); + if (referencedSegmentsCount == 0) { + return SegmentIds.EMPTY; + } + SegmentIds segmentIds = new SegmentIds(referencedSegmentsCount); + for (int i = 0; i < referencedSegmentsCount; i++) { + segmentIds.add(i, segmentData.getSegmentReferenceMsb(i), segmentData.getSegmentReferenceLsb(i)); + } + return segmentIds; + } + class PreloadTask implements Runnable { private final TarFiles tarFiles; @@ -309,7 +363,7 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close private final int depth; private PreloadTask(TarFiles tarFiles, long msb, long lsb, int depth) { - checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d", preloadDepth, depth); + checkArgument(depth <= maxPreloadDepth, "depth must be <= %d, is %d", maxPreloadDepth, depth); this.tarFiles = tarFiles; this.msb = msb; this.lsb = lsb; @@ -320,14 +374,17 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close @Override public void run() { LOG.debug("Running: {}", this); - if (depth < preloadDepth && SegmentId.isDataSegmentId(lsb)) { - dispatch(msb, lsb, depth + 1); - } - if (!delegate.containsSegment(msb, lsb)) { - Buffer segmentBuffer = tarFiles.readSegment(msb, lsb); - if (segmentBuffer != null) { - delegate.writeSegment(msb, lsb, segmentBuffer); + try { + if (depth < preloadDepth || !containsSegment(msb, lsb)) { + Buffer buffer = delegate().readSegment(msb, lsb, () -> tarFiles.readSegment(msb, lsb)); + dispatch(msb, lsb, getReferencedSegments(lsb, buffer), depth + 1); + } + } catch (RuntimeException e) { + if (wrapsInterruptedException(e)) { + Thread.currentThread().interrupt(); + return; } + throw e; } } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java index 9be8947db9..37c887ac48 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java @@ -184,22 +184,22 @@ public class SegmentPreloaderTest { long msb = uuid.getMostSignificantBits(); long lsb = uuid.getLeastSignificantBits(); - DispatchTask task1 = preloader.createDispatchTask(msb, lsb, 1); + DispatchTask task1 = createDispatchTask(preloader, msb, lsb, 1); assertEquals(task1, task1); - DispatchTask task2 = preloader.createDispatchTask(msb, lsb, 1); + DispatchTask task2 = createDispatchTask(preloader, msb, lsb, 1); assertEquals(task1, task2); - DispatchTask task3 = preloader.createDispatchTask(msb, lsb, 0); + DispatchTask task3 = createDispatchTask(preloader, msb, lsb, 0); assertNotEquals(task1, task3); - DispatchTask task4 = preloader.createDispatchTask(msb, lsb + 1, 1); + DispatchTask task4 = createDispatchTask(preloader, msb, lsb + 1, 1); assertNotEquals(task1, task4); - DispatchTask task5 = preloader.createDispatchTask(msb + 1, lsb, 1); + DispatchTask task5 = createDispatchTask(preloader, msb + 1, lsb, 1); assertNotEquals(task1, task5); - assertNotEquals(task1, new Object()); + assertNotEquals(new Object(), task1); }); } @@ -207,7 +207,7 @@ public class SegmentPreloaderTest { public void testDispatchTaskArgumentValidation() throws IOException { withSegmentPreloader(preloader -> { UUID uuid = UUID.randomUUID(); - assertThrows(IllegalArgumentException.class, () -> preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 3)); + assertThrows(IllegalArgumentException.class, () -> createDispatchTask(preloader, uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 3)); }); } @@ -216,8 +216,8 @@ public class SegmentPreloaderTest { withSegmentPreloader(preloader -> { UUID uuid = UUID.randomUUID(); assertEquals( - "DispatchTask{segmentId=" + uuid + ", depth=1}", - preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1).toString()); + "DispatchTask{segmentId=" + uuid + ", depth=1, references=0}", + createDispatchTask(preloader, uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1).toString()); }); } @@ -225,9 +225,9 @@ public class SegmentPreloaderTest { public void testDispatchTaskCompareTo() throws IOException { withSegmentPreloader(preloader -> { UUID uuid = UUID.randomUUID(); - DispatchTask task1 = preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2); - DispatchTask task2 = preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1); - DispatchTask task3 = preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2); + DispatchTask task1 = createDispatchTask(preloader, uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2); + DispatchTask task2 = createDispatchTask(preloader, uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1); + DispatchTask task3 = createDispatchTask(preloader, uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2); List<DispatchTask> tasks = new ArrayList<>(); tasks.add(task1); tasks.add(task2); @@ -273,6 +273,15 @@ public class SegmentPreloaderTest { }); } + @Test + public void testAdaptPreloadDepth() { + assertEquals(4, SegmentPreloader.adaptPreloadDepth(4, 0.0)); + assertEquals(3, SegmentPreloader.adaptPreloadDepth(4, 0.25)); + assertEquals(2, SegmentPreloader.adaptPreloadDepth(4, 0.5)); + assertEquals(1, SegmentPreloader.adaptPreloadDepth(4, 0.75)); + assertEquals(1, SegmentPreloader.adaptPreloadDepth(4, 1.0)); + } + private void withSegmentPreloader(Consumer<SegmentPreloader> withPreloader) throws IOException { MemoryTestCache cache = new MemoryTestCache(); PersistentCachePreloadingConfiguration config = @@ -289,6 +298,10 @@ public class SegmentPreloaderTest { return UUID.fromString(parts[0]); } + private static @NotNull DispatchTask createDispatchTask(SegmentPreloader preloader, long msb, long lsb, int depth) { + return preloader.createDispatchTask(msb, lsb, SegmentIds.EMPTY, depth); + } + private void assertReferencedSegmentsLoaded(Set<UUID> referencedSegments, MemoryTestCache underlyingCache, SegmentPreloader preloadingCache) { Set<UUID> segments = new HashSet<>(referencedSegments); int timeoutSec = 10;
