This is an automated email from the ASF dual-hosted git repository.

jsedding pushed a commit to branch 
jsedding/OAK-12070-reduce-azure-segmentstore-heap-usage
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 718058c4ccd4aa549c64ad0af639cda5af212134
Author: Julian Sedding <[email protected]>
AuthorDate: Wed Jan 21 13:26:46 2026 +0100

    OAK-12069 - reduce SegmentPreloader memory usage
    
    - remove segment-graph cache
    - read segment references from segment data
---
 .../oak/segment/file/preloader/SegmentIds.java     |  64 +++++++
 .../segment/file/preloader/SegmentPreloader.java   | 185 ++++++++++++++-------
 .../file/preloader/SegmentPreloaderTest.java       |  37 +++--
 3 files changed, 210 insertions(+), 76 deletions(-)

diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentIds.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentIds.java
new file mode 100644
index 0000000000..61254bb9ca
--- /dev/null
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentIds.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.file.preloader;
+
+/**
+ * A simple, memory-efficient data structure to hold segment IDs (UUIDs) as 
pairs of long values
+ * (most significant bits and least significant bits).
+ */
+final class SegmentIds {
+
+    public static final SegmentIds EMPTY = new SegmentIds(0);
+
+    private final long[] segmentIds;
+
+    SegmentIds(int size) {
+        this.segmentIds = new long[size * 2];
+    }
+
+    public void add(int index, long msb, long lsb) {
+        checkIndexWithinBounds(index);
+        segmentIds[index * 2] = msb;
+        segmentIds[index * 2 + 1] = lsb;
+    }
+
+    public long getMsb(int index) {
+        checkIndexWithinBounds(index);
+        return segmentIds[index * 2];
+    }
+
+    public long getLsb(int index) {
+        checkIndexWithinBounds(index);
+        return segmentIds[index * 2 + 1];
+    }
+
+    public int size() {
+        return segmentIds.length / 2;
+    }
+
+    public boolean isEmpty() {
+        return segmentIds.length == 0;
+    }
+
+    private void checkIndexWithinBounds(int index) {
+        if (index < 0 || segmentIds.length / 2 <= index) {
+            throw new IndexOutOfBoundsException("Index: " + index + ", Size: " 
+ (segmentIds.length / 2));
+        }
+    }
+}
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java
index 0a136710fc..49e0f84cb6 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java
@@ -21,22 +21,21 @@ package org.apache.jackrabbit.oak.segment.file.preloader;
 import org.apache.jackrabbit.oak.commons.Buffer;
 import org.apache.jackrabbit.oak.commons.internal.function.Suppliers;
 import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.data.SegmentData;
 import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
 import 
org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.DelegatingPersistentCache;
 import 
org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
 import 
org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -60,9 +59,15 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SegmentPreloader.class);
 
-    private final Map<Integer, String> inProgressPrefetch;
+    private static final Thread.UncaughtExceptionHandler 
UNCAUGHT_EXCEPTION_HANDLER = (t, e) -> {
+        if (!(e instanceof InterruptedException)) {
+            LOG.warn("Uncaught exception in thread {} ({}, {})", t.getName(), 
e.getClass(), e.getMessage(), e);
+        }
+    };
+
+    private static final int DISPATCH_QUEUE_MAX_SIZE = 10_000;
 
-    private final ConcurrentHashMap<String, Map<UUID, Set<UUID>>> graphCache;
+    private final Map<Integer, String> inProgressPrefetch;
 
     private final PersistentCache delegate;
 
@@ -70,10 +75,12 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
 
     private final ExecutorService preloadPool;
 
-    private final int preloadDepth;
-
     private final Supplier<TarFiles> tarFiles;
 
+    private final int maxPreloadDepth;
+
+    private volatile int preloadDepth;
+
     /**
      * Factory method that decorates the given {@link PersistentCache} with a
      * {@link SegmentPreloader} if the given configuration requires preloading.
@@ -95,18 +102,45 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
         this.delegate = delegate;
         this.tarFiles = Suppliers.memoize(tarFiles);
         this.inProgressPrefetch = new ConcurrentHashMap<>();
-        this.graphCache = new ConcurrentHashMap<>();
-        this.preloadDepth = config.getMaxPreloadDepth();
+        this.maxPreloadDepth = config.getMaxPreloadDepth();
+        this.preloadDepth = adaptPreloadDepth(this.maxPreloadDepth, 0);
         this.dispatchPool = new ThreadPoolExecutor(1,1,
                 1, TimeUnit.SECONDS,
                 new PriorityBlockingQueue<>(),
-                r -> new Thread(r, "segment-preload-dispatcher")) {
+                r -> {
+                    Thread thread = new Thread(r, 
"segment-preload-dispatcher");
+                    
thread.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER);
+                    return thread;
+                }) {
+
+            private volatile long lastLoggedTime = System.currentTimeMillis();
+
+            @Override
+            public void execute(@NotNull Runnable command) {
+                if (getQueue().size() < DISPATCH_QUEUE_MAX_SIZE) {
+                    super.execute(command);
+                }
+            }
+
             @Override
             protected void afterExecute(Runnable r, Throwable t) {
                 super.afterExecute(r, t);
                 clearInProgressTask(r);
+                int size = getQueue().size();
+                int adaptedPreloadDepth = adaptPreloadDepth(maxPreloadDepth, 
size / (double) DISPATCH_QUEUE_MAX_SIZE);
+                if (adaptedPreloadDepth != preloadDepth) {
+                    preloadDepth = adaptedPreloadDepth;
+                    LOG.debug("Adjusted preload depth to {} (queue size: {})", 
preloadDepth, size);
+                }
+
+                long now = System.currentTimeMillis();
+                if (lastLoggedTime + 15_000 < now) {
+                    lastLoggedTime = now;
+                    LOG.info("Dispatch pool queue size: {}, current preload 
depth: {}", size, preloadDepth);
+                }
             }
         };
+
         int preloadThreads = config.getConcurrency();
         ThreadPoolExecutor preloadPool = new ThreadPoolExecutor(Math.max(1, 
preloadThreads / 4), preloadThreads,
                 5, TimeUnit.SECONDS,
@@ -114,11 +148,8 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
                 r -> {
                     String threadName = String.format("segment-preload-%s", 
Long.toHexString(System.nanoTime() & 0xFFFFF));
                     Thread thread = new Thread(r, threadName);
-                    thread.setUncaughtExceptionHandler((t, e) -> {
-                        if (!(e instanceof InterruptedException)) {
-                            LOG.warn("Uncaught exception in thread {}", 
t.getName(), e);
-                        }
-                    });
+                    thread.setPriority(Thread.MIN_PRIORITY);
+                    
thread.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER);
                     return thread;
                 },
                 (r, executor) -> {
@@ -141,6 +172,18 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
         this.preloadPool = preloadPool;
     }
 
+    @VisibleForTesting
+    static int adaptPreloadDepth(int maxPreloadDepth, double 
queueFillPercentage) {
+        double remainingCapacity = 1.0 - queueFillPercentage;
+        double capacitySlice = 1.0 / maxPreloadDepth;
+        for (int i = 1; i < maxPreloadDepth; i++) {
+            if (remainingCapacity <= i * capacitySlice) {
+                return i;
+            }
+        }
+        return maxPreloadDepth;
+    }
+
     @Override
     protected PersistentCache delegate() {
         return delegate;
@@ -148,21 +191,19 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
 
     @Override
     public @Nullable Buffer readSegment(long msb, long lsb, @NotNull 
Callable<Buffer> loader) {
-        dispatch(msb, lsb);
-        return delegate().readSegment(msb, lsb, loader);
-    }
-
-    private void dispatch(long msb, long lsb) {
-        dispatch(msb, lsb, 1);
+        Buffer buffer = super.readSegment(msb, lsb, loader);
+        dispatch(msb, lsb, getReferencedSegments(lsb, buffer), 1);
+        return buffer;
     }
 
-    private void dispatch(long msb, long lsb, int depth) {
-        execute(dispatchPool, createDispatchTask(msb, lsb, depth));
+    private void dispatch(long msb, long lsb, SegmentIds referencedSegments, 
int depth) {
+        if (depth <= preloadDepth && !referencedSegments.isEmpty() && 
SegmentId.isDataSegmentId(lsb)) {
+            execute(dispatchPool, createDispatchTask(msb, lsb, 
referencedSegments, depth));
+        }
     }
 
-    @NotNull SegmentPreloader.DispatchTask createDispatchTask(long msb, long 
lsb, int depth) {
-        TarFiles tars = tarFiles.get();
-        return new DispatchTask(tars, tars::getIndices, msb, lsb, depth);
+    @NotNull DispatchTask createDispatchTask(long msb, long lsb, SegmentIds 
referencedSegments, int depth) {
+        return new DispatchTask(msb, lsb, referencedSegments, depth);
     }
 
     private void preload(long msb, long lsb, int depth) {
@@ -205,26 +246,34 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
         }
     }
 
-    class DispatchTask implements Runnable, Comparable<DispatchTask> {
-
-        private final TarFiles tarFiles;
+    private boolean wrapsInterruptedException(RuntimeException e) {
+        Throwable candidate = e;
+        while (candidate.getCause() != null) {
+            if (candidate.getCause() instanceof InterruptedException) {
+                return true;
+            }
+            candidate = candidate.getCause();
+        }
+        return false;
+    }
 
-        private final Supplier<Map<String, Set<UUID>>> indicesSupplier;
+    class DispatchTask implements Runnable, Comparable<DispatchTask> {
 
         private final long msb;
 
         private final long lsb;
 
+        private final SegmentIds references;
+
         private final int depth;
 
         private final long creationTime = System.nanoTime();
 
-        private DispatchTask(@NotNull TarFiles tarFiles, Supplier<Map<String, 
Set<UUID>>> indicesSupplier, long msb, long lsb, int depth) {
-            checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d", 
preloadDepth, depth);
-            this.tarFiles = tarFiles;
-            this.indicesSupplier = indicesSupplier;
+        private DispatchTask(long msb, long lsb, SegmentIds references, int 
depth) {
+            checkArgument(depth <= maxPreloadDepth, "depth must be <= %d, is 
%d", maxPreloadDepth, depth);
             this.msb = msb;
             this.lsb = lsb;
+            this.references = references;
             this.depth = depth;
             LOG.debug("Created: {}", this);
         }
@@ -232,30 +281,19 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
         @Override
         public void run() {
             LOG.debug("Running: {}", this);
-            UUID uuid = new UUID(msb, lsb);
-            Map<String, Set<UUID>> indices = indicesSupplier.get();
-            String archiveName = indices.entrySet().stream()
-                    .filter(entry -> entry.getValue().contains(uuid))
-                    .findFirst()
-                    .map(Map.Entry::getKey)
-                    .orElse(null);
-
-            Map<UUID, Set<UUID>> graph = 
graphCache.computeIfAbsent(archiveName, name -> {
-                try {
-                    return tarFiles.getGraph(name);
-                } catch (IOException e) {
-                    throw new UncheckedIOException(e);
-                }
-            });
 
-            for (UUID reference : graph.get(uuid)) {
-                long refMsb = reference.getMostSignificantBits();
-                long refLsb = reference.getLeastSignificantBits();
-                if (!delegate.containsSegment(refMsb, refLsb)) {
+            try {
+                for (int i = 0; i < references.size(); i++) {
+                    long refMsb = references.getMsb(i);
+                    long refLsb = references.getLsb(i);
                     preload(refMsb, refLsb, depth);
-                } else if (depth < preloadDepth && 
SegmentId.isDataSegmentId(refLsb)) {
-                    dispatch(refMsb, refLsb, depth + 1);
                 }
+            } catch (RuntimeException e) {
+                if (wrapsInterruptedException(e)) {
+                    Thread.currentThread().interrupt();
+                    return;
+                }
+                throw e;
             }
         }
 
@@ -278,7 +316,7 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
 
         @Override
         public String toString() {
-            return "DispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" 
+ depth + '}';
+            return "DispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" 
+ depth + ", references=" + references.size() + '}';
         }
 
         private int getPreloadDepth() {
@@ -298,6 +336,22 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
         }
     }
 
+    private static SegmentIds getReferencedSegments(long lsb, @Nullable Buffer 
buffer) {
+        if (buffer == null || !SegmentId.isDataSegmentId(lsb)) {
+            return SegmentIds.EMPTY;
+        }
+        SegmentData segmentData = SegmentData.newSegmentData(buffer);
+        int referencedSegmentsCount = segmentData.getSegmentReferencesCount();
+        if (referencedSegmentsCount == 0) {
+            return SegmentIds.EMPTY;
+        }
+        SegmentIds segmentIds = new SegmentIds(referencedSegmentsCount);
+        for (int i = 0; i < referencedSegmentsCount; i++) {
+            segmentIds.add(i, segmentData.getSegmentReferenceMsb(i), 
segmentData.getSegmentReferenceLsb(i));
+        }
+        return segmentIds;
+    }
+
     class PreloadTask implements Runnable {
 
         private final TarFiles tarFiles;
@@ -309,7 +363,7 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
         private final int depth;
 
         private PreloadTask(TarFiles tarFiles, long msb, long lsb, int depth) {
-            checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d", 
preloadDepth, depth);
+            checkArgument(depth <= maxPreloadDepth, "depth must be <= %d, is 
%d", maxPreloadDepth, depth);
             this.tarFiles = tarFiles;
             this.msb = msb;
             this.lsb = lsb;
@@ -320,14 +374,17 @@ public class SegmentPreloader extends 
DelegatingPersistentCache implements Close
         @Override
         public void run() {
             LOG.debug("Running: {}", this);
-            if (depth < preloadDepth && SegmentId.isDataSegmentId(lsb)) {
-                dispatch(msb, lsb, depth + 1);
-            }
-            if (!delegate.containsSegment(msb, lsb)) {
-                Buffer segmentBuffer = tarFiles.readSegment(msb, lsb);
-                if (segmentBuffer != null) {
-                    delegate.writeSegment(msb, lsb, segmentBuffer);
+            try {
+                if (depth < preloadDepth || !containsSegment(msb, lsb)) {
+                    Buffer buffer = delegate().readSegment(msb, lsb, () -> 
tarFiles.readSegment(msb, lsb));
+                    dispatch(msb, lsb, getReferencedSegments(lsb, buffer), 
depth + 1);
+                }
+            } catch (RuntimeException e) {
+                if (wrapsInterruptedException(e)) {
+                    Thread.currentThread().interrupt();
+                    return;
                 }
+                throw e;
             }
         }
 
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java
index 9be8947db9..37c887ac48 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java
@@ -184,22 +184,22 @@ public class SegmentPreloaderTest {
             long msb = uuid.getMostSignificantBits();
             long lsb = uuid.getLeastSignificantBits();
 
-            DispatchTask task1 = preloader.createDispatchTask(msb, lsb, 1);
+            DispatchTask task1 = createDispatchTask(preloader, msb, lsb, 1);
             assertEquals(task1, task1);
 
-            DispatchTask task2 = preloader.createDispatchTask(msb, lsb, 1);
+            DispatchTask task2 = createDispatchTask(preloader, msb, lsb, 1);
             assertEquals(task1, task2);
 
-            DispatchTask task3 = preloader.createDispatchTask(msb, lsb, 0);
+            DispatchTask task3 = createDispatchTask(preloader, msb, lsb, 0);
             assertNotEquals(task1, task3);
 
-            DispatchTask task4 = preloader.createDispatchTask(msb, lsb + 1, 1);
+            DispatchTask task4 = createDispatchTask(preloader, msb, lsb + 1, 
1);
             assertNotEquals(task1, task4);
 
-            DispatchTask task5 = preloader.createDispatchTask(msb + 1, lsb, 1);
+            DispatchTask task5 = createDispatchTask(preloader, msb + 1, lsb, 
1);
             assertNotEquals(task1, task5);
 
-            assertNotEquals(task1, new Object());
+            assertNotEquals(new Object(), task1);
         });
     }
 
@@ -207,7 +207,7 @@ public class SegmentPreloaderTest {
     public void testDispatchTaskArgumentValidation() throws IOException {
         withSegmentPreloader(preloader -> {
             UUID uuid = UUID.randomUUID();
-            assertThrows(IllegalArgumentException.class, () -> 
preloader.createDispatchTask(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits(), 3));
+            assertThrows(IllegalArgumentException.class, () -> 
createDispatchTask(preloader, uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits(), 3));
         });
     }
 
@@ -216,8 +216,8 @@ public class SegmentPreloaderTest {
         withSegmentPreloader(preloader -> {
             UUID uuid = UUID.randomUUID();
             assertEquals(
-                    "DispatchTask{segmentId=" + uuid + ", depth=1}",
-                    
preloader.createDispatchTask(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits(), 1).toString());
+                    "DispatchTask{segmentId=" + uuid + ", depth=1, 
references=0}",
+                    createDispatchTask(preloader, 
uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1).toString());
         });
     }
 
@@ -225,9 +225,9 @@ public class SegmentPreloaderTest {
     public void testDispatchTaskCompareTo() throws IOException {
         withSegmentPreloader(preloader -> {
             UUID uuid = UUID.randomUUID();
-            DispatchTask task1 = 
preloader.createDispatchTask(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits(), 2);
-            DispatchTask task2 = 
preloader.createDispatchTask(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits(), 1);
-            DispatchTask task3 = 
preloader.createDispatchTask(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits(), 2);
+            DispatchTask task1 = createDispatchTask(preloader, 
uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2);
+            DispatchTask task2 = createDispatchTask(preloader, 
uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1);
+            DispatchTask task3 = createDispatchTask(preloader, 
uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2);
             List<DispatchTask> tasks = new ArrayList<>();
             tasks.add(task1);
             tasks.add(task2);
@@ -273,6 +273,15 @@ public class SegmentPreloaderTest {
         });
     }
 
+    @Test
+    public void testAdaptPreloadDepth() {
+        assertEquals(4, SegmentPreloader.adaptPreloadDepth(4, 0.0));
+        assertEquals(3, SegmentPreloader.adaptPreloadDepth(4, 0.25));
+        assertEquals(2, SegmentPreloader.adaptPreloadDepth(4, 0.5));
+        assertEquals(1, SegmentPreloader.adaptPreloadDepth(4, 0.75));
+        assertEquals(1, SegmentPreloader.adaptPreloadDepth(4, 1.0));
+    }
+
     private void withSegmentPreloader(Consumer<SegmentPreloader> 
withPreloader) throws IOException {
         MemoryTestCache cache = new MemoryTestCache();
         PersistentCachePreloadingConfiguration config =
@@ -289,6 +298,10 @@ public class SegmentPreloaderTest {
         return UUID.fromString(parts[0]);
     }
 
+    private static @NotNull DispatchTask createDispatchTask(SegmentPreloader 
preloader, long msb, long lsb, int depth) {
+        return preloader.createDispatchTask(msb, lsb, SegmentIds.EMPTY, depth);
+    }
+
     private void assertReferencedSegmentsLoaded(Set<UUID> referencedSegments, 
MemoryTestCache underlyingCache, SegmentPreloader preloadingCache) {
         Set<UUID> segments = new HashSet<>(referencedSegments);
         int timeoutSec = 10;

Reply via email to