nfsantos commented on code in PR #2513: URL: https://github.com/apache/jackrabbit-oak/pull/2513#discussion_r2363106809
########## oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java: ########## @@ -19,34 +19,123 @@ package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.segment.data.SegmentData; import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class CachingSegmentArchiveReader implements SegmentArchiveReader { + private static final Logger LOG = LoggerFactory.getLogger(CachingSegmentArchiveReader.class); + + @NotNull private final PersistentCache persistentCache; @NotNull private final SegmentArchiveReader delegate; + private final ExecutorService prefetchExecutor; + private final Set<UUID> inFlightPrefetch = + Collections.newSetFromMap( + new ConcurrentHashMap<>()); + private final boolean prefetchEnabled; + private final int prefetchMaxRefs; + + + public CachingSegmentArchiveReader( @NotNull PersistentCache persistentCache, @NotNull SegmentArchiveReader delegate) { this.persistentCache = persistentCache; this.delegate = delegate; + int threads = Integer.getInteger("oak.segment.cache.threads", 10); + this.prefetchEnabled = Boolean.getBoolean("oak.segment.cache.prefetch.enabled"); + this.prefetchMaxRefs = Integer.getInteger("oak.segment.cache.prefetch.maxRefs", 20); + this.prefetchExecutor = Executors.newFixedThreadPool(threads); } @Override @Nullable public Buffer readSegment(long msb, long lsb) throws IOException { - return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + Buffer buf = persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + if (buf != null && prefetchEnabled) { + schedulePrefetch(msb, lsb, buf); + } + return buf; + } + + private List<UUID> extractReferences(Buffer buffer) { + var data = SegmentData.newSegmentData(buffer); + int refs = data.getSegmentReferencesCount(); + ArrayList<UUID> out = new ArrayList<>(refs); + for (int i = 0; i < refs; i++) { + out.add(new UUID(data.getSegmentReferenceMsb(i), data.getSegmentReferenceLsb(i))); + } + return out; + } + + private void schedulePrefetch(long msb, long lsb, Buffer buffer) { + try { + List<UUID> refs = extractReferences(buffer); + int limit = Math.min(refs.size(), prefetchMaxRefs); + for (int i = 0; i < limit; i++) { Review Comment: You are getting a list with all the references but then potentially iterate over only a subset of them. You could save some work by extracting only the references that will be prefetched, this could also avoid allocating an array with all the refs. Using streams with `limit()`may be an easy way of implementing this optimization, as streams are computed lazily. ########## oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java: ########## @@ -19,34 +19,123 @@ package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.segment.data.SegmentData; import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class CachingSegmentArchiveReader implements SegmentArchiveReader { + private static final Logger LOG = LoggerFactory.getLogger(CachingSegmentArchiveReader.class); + + @NotNull private final PersistentCache persistentCache; @NotNull private final SegmentArchiveReader delegate; + private final ExecutorService prefetchExecutor; + private final Set<UUID> inFlightPrefetch = + Collections.newSetFromMap( + new ConcurrentHashMap<>()); + private final boolean prefetchEnabled; + private final int prefetchMaxRefs; + + + public CachingSegmentArchiveReader( @NotNull PersistentCache persistentCache, @NotNull SegmentArchiveReader delegate) { this.persistentCache = persistentCache; this.delegate = delegate; + int threads = Integer.getInteger("oak.segment.cache.threads", 10); + this.prefetchEnabled = Boolean.getBoolean("oak.segment.cache.prefetch.enabled"); + this.prefetchMaxRefs = Integer.getInteger("oak.segment.cache.prefetch.maxRefs", 20); + this.prefetchExecutor = Executors.newFixedThreadPool(threads); } @Override @Nullable public Buffer readSegment(long msb, long lsb) throws IOException { - return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + Buffer buf = persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + if (buf != null && prefetchEnabled) { + schedulePrefetch(msb, lsb, buf); + } + return buf; + } + + private List<UUID> extractReferences(Buffer buffer) { + var data = SegmentData.newSegmentData(buffer); + int refs = data.getSegmentReferencesCount(); + ArrayList<UUID> out = new ArrayList<>(refs); + for (int i = 0; i < refs; i++) { + out.add(new UUID(data.getSegmentReferenceMsb(i), data.getSegmentReferenceLsb(i))); + } + return out; + } + + private void schedulePrefetch(long msb, long lsb, Buffer buffer) { + try { + List<UUID> refs = extractReferences(buffer); + int limit = Math.min(refs.size(), prefetchMaxRefs); + for (int i = 0; i < limit; i++) { + final UUID ref = refs.get(i); + final long rMsb = ref.getMostSignificantBits(); + final long rLsb = ref.getLeastSignificantBits(); + + // Skip if already present in cache + if (persistentCache.containsSegment(rMsb, rLsb)) { + continue; + } Review Comment: Maybe it's better to do this check in the worker thread, just before the segment is downloaded? The task that is scheduled to download the segment might not execute for a while, if the worker pool is busy, so from this point until the actual download, the segment might have been added to the cache. Or we can leave the check here and do another one before trying to download. It would also be better to have a mechanism similar to the Guava loading cache, where thread requesting a segment that is not in the cache but is being downloaded will block waiting for the first download to complete, instead of starting a new download. This would avoid duplicate downloads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: oak-dev-unsubscr...@jackrabbit.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org