nfsantos commented on code in PR #2513:
URL: https://github.com/apache/jackrabbit-oak/pull/2513#discussion_r2363106809


##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java:
##########
@@ -19,34 +19,123 @@
 package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.segment.data.SegmentData;
 import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 public class CachingSegmentArchiveReader implements SegmentArchiveReader {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CachingSegmentArchiveReader.class);
+
+
     @NotNull
     private final PersistentCache persistentCache;
 
     @NotNull
     private final SegmentArchiveReader delegate;
 
+    private final ExecutorService prefetchExecutor;
+    private final Set<UUID> inFlightPrefetch =
+            Collections.newSetFromMap(
+                    new ConcurrentHashMap<>());
+    private final boolean prefetchEnabled;
+    private final int prefetchMaxRefs;
+
+
+
     public CachingSegmentArchiveReader(
             @NotNull PersistentCache persistentCache,
             @NotNull SegmentArchiveReader delegate) {
         this.persistentCache = persistentCache;
         this.delegate = delegate;
+        int threads = Integer.getInteger("oak.segment.cache.threads", 10);
+        this.prefetchEnabled = 
Boolean.getBoolean("oak.segment.cache.prefetch.enabled");
+        this.prefetchMaxRefs = 
Integer.getInteger("oak.segment.cache.prefetch.maxRefs", 20);
+        this.prefetchExecutor = Executors.newFixedThreadPool(threads);
     }
 
     @Override
     @Nullable
     public Buffer readSegment(long msb, long lsb) throws IOException {
-        return persistentCache.readSegment(msb, lsb, () -> 
delegate.readSegment(msb, lsb));
+        Buffer buf = persistentCache.readSegment(msb, lsb, () -> 
delegate.readSegment(msb, lsb));
+        if (buf != null && prefetchEnabled) {
+            schedulePrefetch(msb, lsb, buf);
+        }
+        return buf;
+    }
+
+    private List<UUID> extractReferences(Buffer buffer) {
+        var data = SegmentData.newSegmentData(buffer);
+        int refs = data.getSegmentReferencesCount();
+        ArrayList<UUID> out = new ArrayList<>(refs);
+        for (int i = 0; i < refs; i++) {
+            out.add(new UUID(data.getSegmentReferenceMsb(i), 
data.getSegmentReferenceLsb(i)));
+        }
+        return out;
+    }
+
+    private void schedulePrefetch(long msb, long lsb, Buffer buffer) {
+        try {
+            List<UUID> refs = extractReferences(buffer);
+            int limit = Math.min(refs.size(), prefetchMaxRefs);
+            for (int i = 0; i < limit; i++) {

Review Comment:
   You are getting a list with all the references but then potentially iterate 
over only a subset of them. You could save some work by extracting only the 
references that will be prefetched, this could also avoid allocating an array 
with all the refs. Using streams with `limit()`may be an easy way of 
implementing this optimization, as streams are computed lazily. 



##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java:
##########
@@ -19,34 +19,123 @@
 package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.segment.data.SegmentData;
 import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 public class CachingSegmentArchiveReader implements SegmentArchiveReader {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CachingSegmentArchiveReader.class);
+
+
     @NotNull
     private final PersistentCache persistentCache;
 
     @NotNull
     private final SegmentArchiveReader delegate;
 
+    private final ExecutorService prefetchExecutor;
+    private final Set<UUID> inFlightPrefetch =
+            Collections.newSetFromMap(
+                    new ConcurrentHashMap<>());
+    private final boolean prefetchEnabled;
+    private final int prefetchMaxRefs;
+
+
+
     public CachingSegmentArchiveReader(
             @NotNull PersistentCache persistentCache,
             @NotNull SegmentArchiveReader delegate) {
         this.persistentCache = persistentCache;
         this.delegate = delegate;
+        int threads = Integer.getInteger("oak.segment.cache.threads", 10);
+        this.prefetchEnabled = 
Boolean.getBoolean("oak.segment.cache.prefetch.enabled");
+        this.prefetchMaxRefs = 
Integer.getInteger("oak.segment.cache.prefetch.maxRefs", 20);
+        this.prefetchExecutor = Executors.newFixedThreadPool(threads);
     }
 
     @Override
     @Nullable
     public Buffer readSegment(long msb, long lsb) throws IOException {
-        return persistentCache.readSegment(msb, lsb, () -> 
delegate.readSegment(msb, lsb));
+        Buffer buf = persistentCache.readSegment(msb, lsb, () -> 
delegate.readSegment(msb, lsb));
+        if (buf != null && prefetchEnabled) {
+            schedulePrefetch(msb, lsb, buf);
+        }
+        return buf;
+    }
+
+    private List<UUID> extractReferences(Buffer buffer) {
+        var data = SegmentData.newSegmentData(buffer);
+        int refs = data.getSegmentReferencesCount();
+        ArrayList<UUID> out = new ArrayList<>(refs);
+        for (int i = 0; i < refs; i++) {
+            out.add(new UUID(data.getSegmentReferenceMsb(i), 
data.getSegmentReferenceLsb(i)));
+        }
+        return out;
+    }
+
+    private void schedulePrefetch(long msb, long lsb, Buffer buffer) {
+        try {
+            List<UUID> refs = extractReferences(buffer);
+            int limit = Math.min(refs.size(), prefetchMaxRefs);
+            for (int i = 0; i < limit; i++) {
+                final UUID ref = refs.get(i);
+                final long rMsb = ref.getMostSignificantBits();
+                final long rLsb = ref.getLeastSignificantBits();
+
+                // Skip if already present in cache
+                if (persistentCache.containsSegment(rMsb, rLsb)) {
+                    continue;
+                }

Review Comment:
   Maybe it's better to do this check in the worker thread, just before the 
segment is downloaded? The task that is scheduled to download the segment might 
not execute for a while, if the worker pool is busy, so from this point until 
the actual download, the segment might have been added to the cache. Or we can 
leave the check here and do another one before trying to download. 
   It would also be better to have a mechanism similar to the Guava loading 
cache, where thread requesting a segment that is not in the cache but is being 
downloaded will block waiting for the first download to complete, instead of 
starting a new download. This would avoid duplicate downloads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: oak-dev-unsubscr...@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to