jsedding commented on code in PR #2513: URL: https://github.com/apache/jackrabbit-oak/pull/2513#discussion_r2359952723
########## oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java: ########## @@ -19,34 +19,123 @@ package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.segment.data.SegmentData; import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class CachingSegmentArchiveReader implements SegmentArchiveReader { + private static final Logger LOG = LoggerFactory.getLogger(CachingSegmentArchiveReader.class); + + @NotNull private final PersistentCache persistentCache; @NotNull private final SegmentArchiveReader delegate; + private final ExecutorService prefetchExecutor; + private final Set<UUID> inFlightPrefetch = + Collections.newSetFromMap( + new ConcurrentHashMap<>()); + private final boolean prefetchEnabled; + private final int prefetchMaxRefs; + + + public CachingSegmentArchiveReader( @NotNull PersistentCache persistentCache, @NotNull SegmentArchiveReader delegate) { this.persistentCache = persistentCache; this.delegate = delegate; + int threads = Integer.getInteger("oak.segment.cache.threads", 10); + this.prefetchEnabled = Boolean.getBoolean("oak.segment.cache.prefetch.enabled"); + this.prefetchMaxRefs = Integer.getInteger("oak.segment.cache.prefetch.maxRefs", 20); + this.prefetchExecutor = Executors.newFixedThreadPool(threads); } @Override @Nullable public Buffer readSegment(long msb, long lsb) throws IOException { - return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + Buffer buf = persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + if (buf != null && prefetchEnabled) { + schedulePrefetch(msb, lsb, buf); + } + return buf; + } + + private List<UUID> extractReferences(Buffer buffer) { + var data = SegmentData.newSegmentData(buffer); + int refs = data.getSegmentReferencesCount(); + ArrayList<UUID> out = new ArrayList<>(refs); + for (int i = 0; i < refs; i++) { + out.add(new UUID(data.getSegmentReferenceMsb(i), data.getSegmentReferenceLsb(i))); + } + return out; + } + + private void schedulePrefetch(long msb, long lsb, Buffer buffer) { + try { + List<UUID> refs = extractReferences(buffer); + int limit = Math.min(refs.size(), prefetchMaxRefs); + for (int i = 0; i < limit; i++) { + final UUID ref = refs.get(i); + final long rMsb = ref.getMostSignificantBits(); + final long rLsb = ref.getLeastSignificantBits(); + + // Skip if already present in cache + if (persistentCache.containsSegment(rMsb, rLsb)) { + continue; + } + + // Drop prefetch if already in progress for this segment + boolean registered = inFlightPrefetch.add(ref); Review Comment: Nice idea! I missed this trick so far in #2519 🙂 ########## oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java: ########## @@ -19,34 +19,123 @@ package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.segment.data.SegmentData; import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class CachingSegmentArchiveReader implements SegmentArchiveReader { + private static final Logger LOG = LoggerFactory.getLogger(CachingSegmentArchiveReader.class); + + @NotNull private final PersistentCache persistentCache; @NotNull private final SegmentArchiveReader delegate; + private final ExecutorService prefetchExecutor; + private final Set<UUID> inFlightPrefetch = + Collections.newSetFromMap( + new ConcurrentHashMap<>()); + private final boolean prefetchEnabled; + private final int prefetchMaxRefs; + + + public CachingSegmentArchiveReader( @NotNull PersistentCache persistentCache, @NotNull SegmentArchiveReader delegate) { this.persistentCache = persistentCache; this.delegate = delegate; + int threads = Integer.getInteger("oak.segment.cache.threads", 10); + this.prefetchEnabled = Boolean.getBoolean("oak.segment.cache.prefetch.enabled"); + this.prefetchMaxRefs = Integer.getInteger("oak.segment.cache.prefetch.maxRefs", 20); + this.prefetchExecutor = Executors.newFixedThreadPool(threads); } @Override @Nullable public Buffer readSegment(long msb, long lsb) throws IOException { - return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + Buffer buf = persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + if (buf != null && prefetchEnabled) { + schedulePrefetch(msb, lsb, buf); + } + return buf; + } + + private List<UUID> extractReferences(Buffer buffer) { + var data = SegmentData.newSegmentData(buffer); + int refs = data.getSegmentReferencesCount(); + ArrayList<UUID> out = new ArrayList<>(refs); + for (int i = 0; i < refs; i++) { + out.add(new UUID(data.getSegmentReferenceMsb(i), data.getSegmentReferenceLsb(i))); + } + return out; + } + + private void schedulePrefetch(long msb, long lsb, Buffer buffer) { Review Comment: Indeed, my tests with #2519, which implements a similar mechanism, but on a different level of abstraction, showed that dispatching from the caller thread hurts performance. I even opted to trigger preloading only from within the load-callback, so preloading is completely off the critical path for cache hits. Of course that's a trade-off, as it requires the caller thread to load one segment before any preloading happens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: oak-dev-unsubscr...@jackrabbit.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org