Author: frm
Date: Tue Apr  4 09:49:13 2017
New Revision: 1790084

URL: http://svn.apache.org/viewvc?rev=1790084&view=rev
Log:
OAK-6002 - Encapsulate TAR files handling in its own class

Added:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
 Tue Apr  4 09:49:13 2017
@@ -18,6 +18,7 @@
  */
 package org.apache.jackrabbit.oak.segment.file;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Maps.newHashMap;
 
@@ -25,8 +26,12 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -34,6 +39,7 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
 import com.google.common.base.Supplier;
+import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.segment.CachingSegmentReader;
 import org.apache.jackrabbit.oak.segment.RecordType;
@@ -46,6 +52,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.SegmentIdFactory;
 import org.apache.jackrabbit.oak.segment.SegmentIdProvider;
 import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.segment.SegmentReader;
 import org.apache.jackrabbit.oak.segment.SegmentStore;
 import org.apache.jackrabbit.oak.segment.SegmentTracker;
@@ -76,11 +83,14 @@ public abstract class AbstractFileStore
      */
     static final int CURRENT_STORE_VERSION = 1;
 
-    private static final Pattern FILE_NAME_PATTERN =
-            Pattern.compile("(data)((0|[1-9][0-9]*)[0-9]{4})([a-z])?.tar");
-
     static final String FILE_NAME_FORMAT = "data%05d%s.tar";
 
+    protected static boolean notEmptyDirectory(File path) {
+        Collection<File> entries = FileUtils.listFiles(path, new String[] 
{"tar"}, false);
+        checkArgument(entries != null, "{} is not a directory, or an I/O error 
occurred", path);
+        return entries.size() > 0;
+    }
+
     @Nonnull
     final SegmentTracker tracker;
 
@@ -127,7 +137,14 @@ public abstract class AbstractFileStore
         this.ioMonitor = builder.getIOMonitor();
     }
 
-     File getManifestFile() {
+    static SegmentNotFoundException 
asSegmentNotFoundException(ExecutionException e, SegmentId id) {
+        if (e.getCause() instanceof SegmentNotFoundException) {
+            return (SegmentNotFoundException) e.getCause();
+        }
+        return new SegmentNotFoundException(id, e);
+    }
+
+    File getManifestFile() {
         return new File(directory, MANIFEST_FILE_NAME);
     }
 
@@ -182,35 +199,6 @@ public abstract class AbstractFileStore
         return segmentReader.getTemplateCacheStats();
     }
 
-    static Map<Integer, Map<Character, File>> collectFiles(File directory) {
-        Map<Integer, Map<Character, File>> dataFiles = newHashMap();
-
-        for (File file : listFiles(directory)) {
-            Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName());
-            if (matcher.matches()) {
-                Integer index = Integer.parseInt(matcher.group(2));
-                Map<Character, File> files = dataFiles.get(index);
-                if (files == null) {
-                    files = newHashMap();
-                    dataFiles.put(index, files);
-                }
-                Character generation = 'a';
-                if (matcher.group(4) != null) {
-                    generation = matcher.group(4).charAt(0);
-                }
-                checkState(files.put(generation, file) == null);
-            }
-        }
-
-        return dataFiles;
-    }
-
-    @Nonnull
-    private static File[] listFiles(File directory) {
-        File[] files = directory.listFiles();
-        return files == null ? new File[] {} : files;
-    }
-
     @Nonnull
     public abstract SegmentWriter getWriter();
 
@@ -285,6 +273,29 @@ public abstract class AbstractFileStore
         });
     }
 
+    static Set<UUID> readReferences(Segment segment) {
+        Set<UUID> references = new HashSet<>();
+        for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
+            references.add(segment.getReferencedSegmentId(i));
+        }
+        return references;
+    }
+
+    static Set<String> readBinaryReferences(final Segment segment) {
+        final Set<String> binaryReferences = new HashSet<>();
+        segment.forEachRecord(new RecordConsumer() {
+
+            @Override
+            public void consume(int number, RecordType type, int offset) {
+                if (type == RecordType.BLOB_ID) {
+                    binaryReferences.add(SegmentBlob.readBlobId(segment, 
number));
+                }
+            }
+
+        });
+        return binaryReferences;
+    }
+
     static void closeAndLogOnFail(Closeable closeable) {
         if (closeable != null) {
             try {
@@ -296,4 +307,12 @@ public abstract class AbstractFileStore
         }
     }
 
+    Segment readSegmentUncached(TarFiles tarFiles, SegmentId id) {
+        ByteBuffer buffer = tarFiles.readSegment(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
+        if (buffer == null) {
+            throw new SegmentNotFoundException(id);
+        }
+        return new Segment(tracker, segmentReader, id, buffer);
+    }
+
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
 Tue Apr  4 09:49:13 2017
@@ -18,10 +18,6 @@
  */
 package org.apache.jackrabbit.oak.segment.file;
 
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static com.google.common.collect.Lists.newLinkedList;
-import static com.google.common.collect.Maps.newLinkedHashMap;
 import static com.google.common.collect.Sets.newHashSet;
 import static java.lang.Integer.getInteger;
 import static java.lang.String.format;
@@ -48,10 +44,7 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
-import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -59,8 +52,6 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
@@ -72,7 +63,6 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
 import com.google.common.io.Closer;
 import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
 import org.apache.jackrabbit.oak.segment.Compactor;
@@ -86,6 +76,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.WriterCacheManager;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.segment.file.GCJournal.GCJournalEntry;
+import org.apache.jackrabbit.oak.segment.file.TarFiles.CleanupResult;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
@@ -95,6 +86,7 @@ import org.slf4j.LoggerFactory;
  * The storage implementation for tar files.
  */
 public class FileStore extends AbstractFileStore {
+
     private static final Logger log = LoggerFactory.getLogger(FileStore.class);
 
     /**
@@ -121,9 +113,7 @@ public class FileStore extends AbstractF
     @Nonnull
     private final GarbageCollector garbageCollector;
 
-    private volatile List<TarReader> readers;
-
-    private volatile TarWriter tarWriter;
+    private final TarFiles tarFiles;
 
     private final RandomAccessFile lockFile;
 
@@ -160,8 +150,6 @@ public class FileStore extends AbstractF
      */
     private volatile boolean shutdown;
 
-    private final ReadWriteLock fileStoreLock = new ReentrantReadWriteLock();
-
     private final FileStoreStats stats;
 
     @Nonnull
@@ -192,30 +180,25 @@ public class FileStore extends AbstractF
         this.garbageCollector = new GarbageCollector(
                 builder.getGcOptions(), builder.getGcListener(), new 
GCJournal(directory), builder.getCacheManager());
 
-        Map<Integer, Map<Character, File>> map = collectFiles(directory);
-
         Manifest manifest = Manifest.empty();
 
-        if (!map.isEmpty()) {
+        if (notEmptyDirectory(directory)) {
             manifest = checkManifest(openManifest());
         }
 
         saveManifest(manifest);
 
-        this.readers = newArrayListWithCapacity(map.size());
-        Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
-        Arrays.sort(indices);
-        for (int i = indices.length - 1; i >= 0; i--) {
-            readers.add(TarReader.open(map.get(indices[i]), memoryMapping, 
recovery, ioMonitor));
-        }
-        this.stats = new FileStoreStats(builder.getStatsProvider(), this, 
size());
-
-        int writeNumber = 0;
-        if (indices.length > 0) {
-            writeNumber = indices[indices.length - 1] + 1;
-        }
-        this.tarWriter = new TarWriter(directory, stats, writeNumber, 
ioMonitor);
-        
+        this.stats = new FileStoreStats(builder.getStatsProvider(), this, 0);
+        this.tarFiles = TarFiles.builder()
+                .withDirectory(directory)
+                .withMemoryMapping(memoryMapping)
+                .withTarRecovery(recovery)
+                .withIOMonitor(ioMonitor)
+                .withFileStoreStats(stats)
+                .withMaxFileSize(maxFileSize)
+                .build();
+        this.stats.init(this.tarFiles.size());
+
         this.snfeListener = builder.getSnfeListener();
 
         fileStoreScheduler.scheduleAtFixedRate(
@@ -253,7 +236,7 @@ public class FileStore extends AbstractF
                     }
                 });
         log.info("TarMK opened: {} (mmap={})", directory, memoryMapping);
-        log.debug("TarMK readers {}", this.readers);
+        log.debug("TAR files: {}", tarFiles);
     }
 
     FileStore bind(TarRevisions revisions) throws IOException {
@@ -309,36 +292,14 @@ public class FileStore extends AbstractF
     }
 
     /**
-     * @return the size of this store. This method shouldn't be called from
-     * a very tight loop as it contents with the {@link #fileStoreLock}.
+     * @return the size of this store.
      */
     private long size() {
-        List<TarReader> readersSnapshot;
-        long writeFileSnapshotSize;
-
-        fileStoreLock.readLock().lock();
-        try {
-            readersSnapshot = ImmutableList.copyOf(readers);
-            writeFileSnapshotSize = tarWriter != null ? tarWriter.fileLength() 
: 0;
-        } finally {
-            fileStoreLock.readLock().unlock();
-        }
-
-        long size = writeFileSnapshotSize;
-        for (TarReader reader : readersSnapshot) {
-            size += reader.size();
-        }
-
-        return size;
+        return tarFiles.size();
     }
 
     public int readerCount(){
-        fileStoreLock.readLock().lock();
-        try {
-            return readers.size();
-        } finally {
-            fileStoreLock.readLock().unlock();
-        }
+        return tarFiles.readerCount();
     }
 
     public FileStoreStats getStats() {
@@ -353,9 +314,8 @@ public class FileStore extends AbstractF
             @Override
             public Void call() throws Exception {
                 segmentWriter.flush();
-                tarWriter.flush();
+                tarFiles.flush();
                 stats.flushed();
-
                 return null;
             }
         });
@@ -453,27 +413,15 @@ public class FileStore extends AbstractF
 
         Closer closer = Closer.create();
         closer.register(revisions);
-        fileStoreLock.writeLock().lock();
-        try {
-            if (lock != null) {
-                try {
-                    lock.release();
-                } catch (IOException e) {
-                    log.warn("Unable to release the file lock", e);
-                }
-            }
-            closer.register(lockFile);
-
-            List<TarReader> list = readers;
-            readers = newArrayList();
-            for (TarReader reader : list) {
-                closer.register(reader);
+        if (lock != null) {
+            try {
+                lock.release();
+            } catch (IOException e) {
+                log.warn("Unable to release the file lock", e);
             }
-
-            closer.register(tarWriter);
-        } finally {
-            fileStoreLock.writeLock().unlock();
         }
+        closer.register(lockFile);
+        closer.register(tarFiles);
         closeAndLogOnFail(closer);
 
         // Try removing pending files in case the scheduler didn't have a 
chance to run yet
@@ -485,24 +433,7 @@ public class FileStore extends AbstractF
 
     @Override
     public boolean containsSegment(SegmentId id) {
-        if (FileStoreUtil.containSegment(readers, id)) {
-            return true;
-        }
-
-        if (tarWriter != null) {
-            fileStoreLock.readLock().lock();
-            try {
-                if (tarWriter.containsEntry(id.getMostSignificantBits(), 
id.getLeastSignificantBits())) {
-                    return true;
-                }
-            } finally {
-                fileStoreLock.readLock().unlock();
-            }
-        }
-
-        // the writer might have switched to a new file,
-        // so we need to re-check the readers
-        return FileStoreUtil.containSegment(readers, id);
+        return tarFiles.containsSegment(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
     }
 
     @Override
@@ -512,38 +443,7 @@ public class FileStore extends AbstractF
             return segmentCache.getSegment(id, new Callable<Segment>() {
                 @Override
                 public Segment call() throws Exception {
-                    ByteBuffer buffer = FileStoreUtil.readEntry(readers, id);
-                    if (buffer != null) {
-                        return new Segment(tracker, segmentReader, id, buffer);
-                    }
-
-                    if (tarWriter != null) {
-                        fileStoreLock.readLock().lock();
-                        try {
-                            try {
-                                buffer = 
tarWriter.readEntry(id.getMostSignificantBits(), id.getLeastSignificantBits());
-                                if (buffer != null) {
-                                    return new Segment(tracker, segmentReader, 
id, buffer);
-                                }
-                            } catch (IOException e) {
-                                log.warn("Failed to read from tar file {}", 
tarWriter, e);
-                            }
-                        } finally {
-                            fileStoreLock.readLock().unlock();
-                        }
-                    }
-
-                    // The TarWriter might have become a TarReader in the
-                    // meantime. Moreover, the TarWriter that became a 
TarReader
-                    // might have additional entries. Because of this, we need
-                    // to check the list of TarReaders once more.
-
-                    buffer = FileStoreUtil.readEntry(readers, id);
-                    if (buffer != null) {
-                        return new Segment(tracker, segmentReader, id, buffer);
-                    }
-
-                    throw new SegmentNotFoundException(id);
+                    return readSegmentUncached(tarFiles, id);
                 }
             });
         } catch (ExecutionException e) {
@@ -553,13 +453,6 @@ public class FileStore extends AbstractF
         }
     }
 
-    private static SegmentNotFoundException 
asSegmentNotFoundException(ExecutionException e, SegmentId id) {
-        if (e.getCause() instanceof SegmentNotFoundException) {
-            return (SegmentNotFoundException) e.getCause();
-        }
-        return new SegmentNotFoundException(id, e);
-    }
-
     @Override
     public void writeSegment(SegmentId id, byte[] buffer, int offset, int 
length) throws IOException {
         Segment segment = null;
@@ -569,6 +462,9 @@ public class FileStore extends AbstractF
         // in an in-memory cache for later use.
 
         int generation = 0;
+        Set<UUID> references = null;
+        Set<String> binaryReferences = null;
+
         if (id.isDataSegmentId()) {
             ByteBuffer data;
 
@@ -582,63 +478,26 @@ public class FileStore extends AbstractF
 
             segment = new Segment(tracker, segmentReader, id, data);
             generation = segment.getGcGeneration();
+            references = readReferences(segment);
+            binaryReferences = readBinaryReferences(segment);
         }
 
-        fileStoreLock.writeLock().lock();
-        try {
-            // Flush the segment to disk
-
-            long size = tarWriter.writeEntry(
-                    id.getMostSignificantBits(),
-                    id.getLeastSignificantBits(),
-                    buffer,
-                    offset,
-                    length,
-                    generation
-            );
-
-            // If the segment is a data segment, update the graph before
-            // (potentially) flushing the TAR file.
-
-            if (segment != null) {
-                populateTarGraph(segment, tarWriter);
-                populateTarBinaryReferences(segment, tarWriter);
-            }
-
-            // Close the TAR file if the size exceeds the maximum.
-
-            if (size >= maxFileSize) {
-                newWriter();
-            }
-        } finally {
-            fileStoreLock.writeLock().unlock();
-        }
+        tarFiles.writeSegment(
+                id.asUUID(),
+                buffer,
+                offset,
+                length,
+                generation,
+                references,
+                binaryReferences
+        );
 
         // Keep this data segment in memory as it's likely to be accessed soon.
-
         if (segment != null) {
             segmentCache.putSegment(segment);
         }
     }
 
-    /**
-     * Switch to a new tar writer.
-     * This method may only be called when holding the write lock of {@link 
#fileStoreLock}
-     * @throws IOException
-     */
-    private void newWriter() throws IOException {
-        TarWriter newWriter = tarWriter.createNextGeneration();
-        if (newWriter != tarWriter) {
-            File writeFile = tarWriter.getFile();
-            List<TarReader> list =
-                    newArrayListWithCapacity(1 + readers.size());
-            list.add(TarReader.open(writeFile, memoryMapping, ioMonitor));
-            list.addAll(readers);
-            readers = list;
-            tarWriter = newWriter;
-        }
-    }
-
     private void checkDiskSpace(SegmentGCOptions gcOptions) {
         long repositoryDiskSpace = size();
         long availableDiskSpace = directory.getFreeSpace();
@@ -943,99 +802,28 @@ public class FileStore extends AbstractF
         throws IOException {
             Stopwatch watch = Stopwatch.createStarted();
             Set<UUID> bulkRefs = newHashSet();
-            Map<TarReader, TarReader> cleaned = newLinkedHashMap();
 
-            long initialSize = 0;
-            fileStoreLock.writeLock().lock();
-            try {
-                gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT);
-                gcListener.updateStatus(CLEANUP.message());
-
-                newWriter();
-                segmentCache.clear();
-
-                // Suggest to the JVM that now would be a good time
-                // to clear stale weak references in the SegmentTracker
-                System.gc();
-
-                for (SegmentId id : tracker.getReferencedSegmentIds()) {
-                    if (id.isBulkSegmentId()) {
-                        bulkRefs.add(id.asUUID());
-                    }
-                }
-
-                for (TarReader reader : readers) {
-                    cleaned.put(reader, reader);
-                    initialSize += reader.size();
-                }
-            } finally {
-                fileStoreLock.writeLock().unlock();
+            gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT);
+            gcListener.updateStatus(CLEANUP.message());
+            segmentCache.clear();
+
+            // Suggest to the JVM that now would be a good time
+            // to clear stale weak references in the SegmentTracker
+            System.gc();
+
+            for (SegmentId id : tracker.getReferencedSegmentIds()) {
+                bulkRefs.add(id.asUUID());
+            }
+            
+            CleanupResult cleanupResult = tarFiles.cleanup(bulkRefs, 
compactionResult.reclaimer());
+            if (cleanupResult.isInterrupted()) {
+                gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
             }
-
-            gcListener.info("TarMK GC #{}: current repository size is {} ({} 
bytes)",
-                    GC_COUNT, humanReadableByteCount(initialSize), 
initialSize);
-
-            Set<UUID> reclaim = newHashSet();
-            for (TarReader reader : cleaned.keySet()) {
-                reader.mark(bulkRefs, reclaim, compactionResult.reclaimer());
-                log.info("{}: size of bulk references/reclaim set {}/{}",
-                        reader, bulkRefs.size(), reclaim.size());
-                if (shutdown) {
-                    gcListener.info("TarMK GC #{}: cleanup interrupted", 
GC_COUNT);
-                    break;
-                }
-            }
-            Set<UUID> reclaimed = newHashSet();
-            for (TarReader reader : cleaned.keySet()) {
-                cleaned.put(reader, reader.sweep(reclaim, reclaimed));
-                if (shutdown) {
-                    gcListener.info("TarMK GC #{}: cleanup interrupted", 
GC_COUNT);
-                    break;
-                }
-            }
-
-            // it doesn't account for concurrent commits that might have 
happened
-            long afterCleanupSize = 0;
-
-            List<TarReader> oldReaders = newArrayList();
-            fileStoreLock.writeLock().lock();
-            try {
-                // Replace current list of reader with the cleaned readers 
taking care not to lose
-                // any new reader that might have come in through concurrent 
calls to newWriter()
-                List<TarReader> sweptReaders = newArrayList();
-                for (TarReader reader : readers) {
-                    if (cleaned.containsKey(reader)) {
-                        TarReader newReader = cleaned.get(reader);
-                        if (newReader != null) {
-                            sweptReaders.add(newReader);
-                            afterCleanupSize += newReader.size();
-                        }
-                        // if these two differ, the former represents the 
swept version of the latter
-                        if (newReader != reader) {
-                            oldReaders.add(reader);
-                        }
-                    } else {
-                        sweptReaders.add(reader);
-                    }
-                }
-                readers = sweptReaders;
-            } finally {
-                fileStoreLock.writeLock().unlock();
-            }
-            tracker.clearSegmentIdTables(reclaimed, compactionResult.gcInfo());
-
-            // Close old readers *after* setting readers to the new readers to 
avoid accessing
-            // a closed reader from readSegment()
-            LinkedList<File> toRemove = newLinkedList();
-            for (TarReader oldReader : oldReaders) {
-                closeAndLogOnFail(oldReader);
-                File file = oldReader.getFile();
-                toRemove.addLast(file);
-            }
-            gcListener.info("TarMK GC #{}: cleanup marking files for deletion: 
{}", GC_COUNT, toFileNames(toRemove));
+            
tracker.clearSegmentIdTables(cleanupResult.getReclaimedSegmentIds(), 
compactionResult.gcInfo());
+            gcListener.info("TarMK GC #{}: cleanup marking files for deletion: 
{}", GC_COUNT, toFileNames(cleanupResult.getRemovableFiles()));
 
             long finalSize = size();
-            long reclaimedSize = initialSize - afterCleanupSize;
+            long reclaimedSize = cleanupResult.getReclaimedSize();
             stats.reclaimed(reclaimedSize);
             gcJournal.persist(reclaimedSize, finalSize, getGcGeneration(),
                     compactionMonitor.getCompactedNodes(),
@@ -1046,7 +834,7 @@ public class FileStore extends AbstractF
                     GC_COUNT, watch, watch.elapsed(MILLISECONDS),
                     humanReadableByteCount(finalSize), finalSize,
                     humanReadableByteCount(reclaimedSize), reclaimedSize);
-            return toRemove;
+            return cleanupResult.getRemovableFiles();
         }
 
         private String toFileNames(@Nonnull List<File> files) {
@@ -1056,7 +844,13 @@ public class FileStore extends AbstractF
                 return Joiner.on(",").join(files);
             }
         }
-        
+
+        private void collectBulkReferences(Set<UUID> bulkRefs) {
+            for (SegmentId id : tracker.getReferencedSegmentIds()) {
+                bulkRefs.add(id.asUUID());
+            }
+        }
+
         /**
          * Finds all external blob references that are currently accessible
          * in this repository and adds them to the given collector. Useful
@@ -1071,19 +865,8 @@ public class FileStore extends AbstractF
          */
         synchronized void collectBlobReferences(ReferenceCollector collector) 
throws IOException {
             segmentWriter.flush();
-            List<TarReader> tarReaders = newArrayList();
-            fileStoreLock.writeLock().lock();
-            try {
-                newWriter();
-                tarReaders.addAll(FileStore.this.readers);
-            } finally {
-                fileStoreLock.writeLock().unlock();
-            }
-
             int minGeneration = getGcGeneration() - 
gcOptions.getRetainedGenerations() + 1;
-            for (TarReader tarReader : tarReaders) {
-                tarReader.collectBlobReferences(collector, minGeneration);
-            }
+            tarFiles.collectBlobReferences(collector, minGeneration);
         }
 
         void cancel() {

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java
 Tue Apr  4 09:49:13 2017
@@ -51,6 +51,10 @@ public class FileStoreStats implements F
         repoSize.inc(initialSize);
     }
 
+    public void init(long initialSize) {
+        repoSize.inc(initialSize);
+    }
+
     //~-----------------------------< FileStoreMonitor >
 
     @Override

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java
 Tue Apr  4 09:49:13 2017
@@ -67,40 +67,50 @@ class FileStoreUtil {
         return null;
     }
 
+    static boolean containSegment(List<TarReader> readers, SegmentId id) {
+        return containSegment(readers, id.getMostSignificantBits(), 
id.getLeastSignificantBits());
+    }
+
     /**
      * Check if a segment is contained in one of the provided TAR files.
      *
      * @param readers A list of {@link TarReader} instances.
-     * @param id      An instance of {@link SegmentId}.
+     * @param msb     Most significant bits of the segment ID.
+     * @param lsb     Least significant bits of the segment ID.
      * @return {@code true} if the segment is contained in at least one of the
      * provided TAR files, {@code false} otherwise.
      */
-    static boolean containSegment(List<TarReader> readers, SegmentId id) {
+    static boolean containSegment(List<TarReader> readers, long msb, long lsb) 
{
         for (TarReader reader : readers) {
-            if (reader.containsEntry(id.getMostSignificantBits(), 
id.getLeastSignificantBits())) {
+            if (reader.containsEntry(msb, lsb)) {
                 return true;
             }
         }
         return false;
     }
 
+    static ByteBuffer readEntry(List<TarReader> readers, SegmentId id) {
+        return readEntry(readers, id.getMostSignificantBits(), 
id.getLeastSignificantBits());
+    }
+
     /**
      * Read the entry corresponding to a segment from one of the provided TAR
      * files.
      *
      * @param readers A list of {@link TarReader} instances.
-     * @param id      An instance of {@link SegmentId}.
+     * @param msb     Most significant bits of the segment ID.
+     * @param lsb     Least significant bits of the segment ID.
      * @return An instance of {@link ByteBuffer} if the entry for the segment
      * could be found, {@code null} otherwise.
      */
-    static ByteBuffer readEntry(List<TarReader> readers, SegmentId id) {
+    static ByteBuffer readEntry(List<TarReader> readers, long msb, long lsb) {
         for (TarReader reader : readers) {
             if (reader.isClosed()) {
                 log.debug("Skipping closed tar file {}", reader);
                 continue;
             }
             try {
-                ByteBuffer buffer = 
reader.readEntry(id.getMostSignificantBits(), id.getLeastSignificantBits());
+                ByteBuffer buffer = reader.readEntry(msb, lsb);
                 if (buffer != null) {
                     return buffer;
                 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java
 Tue Apr  4 09:49:13 2017
@@ -18,19 +18,10 @@
  */
 package org.apache.jackrabbit.oak.segment.file;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Sets.newHashSet;
-import static java.util.Collections.emptyMap;
 import static 
org.apache.jackrabbit.oak.segment.SegmentWriterBuilder.segmentWriterBuilder;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -45,7 +36,6 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.Segment;
 import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
 import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.segment.SegmentWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +51,7 @@ public class ReadOnlyFileStore extends A
     private static final Logger log = LoggerFactory
             .getLogger(ReadOnlyFileStore.class);
 
-    private final List<TarReader> readers;
+    private final TarFiles tarFiles;
 
     @Nonnull
     private final SegmentWriter writer;
@@ -73,21 +63,17 @@ public class ReadOnlyFileStore extends A
     ReadOnlyFileStore(FileStoreBuilder builder) throws 
InvalidFileStoreVersionException, IOException {
         super(builder);
 
-        Map<Integer, Map<Character, File>> map = collectFiles(directory);
-
-        if (!map.isEmpty()) {
+        if (notEmptyDirectory(directory)) {
             checkManifest(openManifest());
         }
 
-        this.readers = newArrayListWithCapacity(map.size());
-        Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
-        Arrays.sort(indices);
-        for (int i = indices.length - 1; i >= 0; i--) {
-            // only try to read-only recover the latest file as that might
-            // be the *only* one still being accessed by a writer
-            boolean recover = i == indices.length - 1;
-            readers.add(TarReader.openRO(map.get(indices[i]), memoryMapping, 
recover, recovery, ioMonitor));
-        }
+        tarFiles = TarFiles.builder()
+                .withDirectory(directory)
+                .withTarRecovery(recovery)
+                .withIOMonitor(ioMonitor)
+                .withMemoryMapping(memoryMapping)
+                .withReadOnly()
+                .build();
 
         writer = segmentWriterBuilder("read-only").withoutCache().build(this);
         log.info("TarMK ReadOnly opened: {} (mmap={})", directory,
@@ -114,25 +100,6 @@ public class ReadOnlyFileStore extends A
     }
 
     /**
-     * Include the ids of all segments transitively reachable through forward
-     * references from {@code referencedIds}. See OAK-3864.
-     */
-    private static void includeForwardReferences(Iterable<TarReader> readers,
-            Set<UUID> referencedIds) throws IOException {
-        Set<UUID> fRefs = newHashSet(referencedIds);
-        do {
-            // Add direct forward references
-            for (TarReader reader : readers) {
-                reader.calculateForwardReferences(fRefs);
-                if (fRefs.isEmpty()) {
-                    break; // Optimisation: bail out if no references left
-                }
-            }
-            // ... as long as new forward references are found.
-        } while (referencedIds.addAll(fRefs));
-    }
-
-    /**
      * Build the graph of segments reachable from an initial set of segments
      * 
      * @param roots
@@ -141,15 +108,8 @@ public class ReadOnlyFileStore extends A
      *            visitor receiving call back while following the segment graph
      * @throws IOException
      */
-    public void traverseSegmentGraph(@Nonnull Set<UUID> roots,
-            @Nonnull SegmentGraphVisitor visitor) throws IOException {
-
-        List<TarReader> readers = this.readers;
-        includeForwardReferences(readers, roots);
-        for (TarReader reader : readers) {
-            reader.traverseSegmentGraph(checkNotNull(roots),
-                    checkNotNull(visitor));
-        }
+    public void traverseSegmentGraph(@Nonnull Set<UUID> roots, @Nonnull 
SegmentGraphVisitor visitor) throws IOException {
+        tarFiles.traverseSegmentGraph(roots, visitor);
     }
 
     @Override
@@ -159,7 +119,7 @@ public class ReadOnlyFileStore extends A
 
     @Override
     public boolean containsSegment(SegmentId id) {
-        return FileStoreUtil.containSegment(readers, id);
+        return tarFiles.containsSegment(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
     }
 
     @Override
@@ -169,26 +129,18 @@ public class ReadOnlyFileStore extends A
             return segmentCache.getSegment(id, new Callable<Segment>() {
                 @Override
                 public Segment call() throws Exception {
-                    ByteBuffer buffer = FileStoreUtil.readEntry(readers, id);
-                    if (buffer == null) {
-                        throw new SegmentNotFoundException(id);
-                    }
-                    return new Segment(tracker, segmentReader, id, buffer);
+                    return readSegmentUncached(tarFiles, id);
                 }
             });
         } catch (ExecutionException e) {
-            throw e.getCause() instanceof SegmentNotFoundException
-                ? (SegmentNotFoundException) e.getCause()
-                : new SegmentNotFoundException(id, e);
+            throw asSegmentNotFoundException(e, id);
         }
     }
 
     @Override
     public void close() {
         Closer closer = Closer.create();
-        for (TarReader r : readers) {
-            closer.register(r);
-        }
+        closer.register(tarFiles);
         closer.register(revisions);
         closeAndLogOnFail(closer);
         System.gc(); // for any memory-mappings that are no longer used
@@ -202,39 +154,19 @@ public class ReadOnlyFileStore extends A
     }
 
     public Map<String, Set<UUID>> getTarReaderIndex() {
-        Map<String, Set<UUID>> index = new HashMap<String, Set<UUID>>();
-        for (TarReader reader : readers) {
-            index.put(reader.getFile().getAbsolutePath(), reader.getUUIDs());
-        }
-        return index;
+        return tarFiles.getIndices();
     }
 
-    public Map<UUID, List<UUID>> getTarGraph(String fileName)
-            throws IOException {
-        for (TarReader reader : readers) {
-            if (fileName.equals(reader.getFile().getName())) {
-                Map<UUID, List<UUID>> graph = newHashMap();
-                for (UUID uuid : reader.getUUIDs()) {
-                    graph.put(uuid, null);
-                }
-                Map<UUID, List<UUID>> g = reader.getGraph(false);
-                if (g != null) {
-                    graph.putAll(g);
-                }
-                return graph;
-            }
-        }
-        return emptyMap();
+    public Map<UUID, List<UUID>> getTarGraph(String fileName) throws 
IOException {
+        return tarFiles.getGraph(fileName);
     }
 
     public Iterable<SegmentId> getSegmentIds() {
-        List<SegmentId> ids = newArrayList();
-        for (TarReader reader : readers) {
-            for (UUID uuid : reader.getUUIDs()) {
-                long msb = uuid.getMostSignificantBits();
-                long lsb = uuid.getLeastSignificantBits();
-                ids.add(tracker.newSegmentId(msb, lsb));
-            }
+        List<SegmentId> ids = new ArrayList<>();
+        for (UUID id : tarFiles.getSegmentIds()) {
+            long msb = id.getMostSignificantBits();
+            long lsb = id.getLeastSignificantBits();
+            ids.add(tracker.newSegmentId(msb, lsb));
         }
         return ids;
     }

Added: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java?rev=1790084&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
 Tue Apr  4 09:49:13 2017
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.newArrayListWithCapacity;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Maps.newLinkedHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+import static org.apache.commons.io.FileUtils.listFiles;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Predicate;
+import com.google.common.io.Closer;
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TarFiles implements Closeable {
+
+    static class CleanupResult {
+
+        private boolean interrupted;
+
+        private long reclaimedSize;
+
+        private List<File> removableFiles;
+
+        private Set<UUID> reclaimedSegmentIds;
+
+        private CleanupResult() {
+            // Prevent external instantiation.
+        }
+
+        long getReclaimedSize() {
+            return reclaimedSize;
+        }
+
+        List<File> getRemovableFiles() {
+            return removableFiles;
+        }
+
+        Set<UUID> getReclaimedSegmentIds() {
+            return reclaimedSegmentIds;
+        }
+
+        boolean isInterrupted() {
+            return interrupted;
+        }
+
+    }
+
+    static class Builder {
+
+        private File directory;
+
+        private boolean memoryMapping;
+
+        private TarRecovery tarRecovery;
+
+        private IOMonitor ioMonitor;
+
+        private FileStoreStats fileStoreStats;
+
+        private long maxFileSize;
+
+        private boolean readOnly;
+
+        Builder withDirectory(File directory) {
+            this.directory = checkNotNull(directory);
+            return this;
+        }
+
+        Builder withMemoryMapping(boolean memoryMapping) {
+            this.memoryMapping = memoryMapping;
+            return this;
+        }
+
+        Builder withTarRecovery(TarRecovery tarRecovery) {
+            this.tarRecovery = checkNotNull(tarRecovery);
+            return this;
+        }
+
+        Builder withIOMonitor(IOMonitor ioMonitor) {
+            this.ioMonitor = checkNotNull(ioMonitor);
+            return this;
+        }
+
+        Builder withFileStoreStats(FileStoreStats fileStoreStats) {
+            this.fileStoreStats = checkNotNull(fileStoreStats);
+            return this;
+        }
+
+        Builder withMaxFileSize(long maxFileSize) {
+            checkArgument(maxFileSize > 0);
+            this.maxFileSize = maxFileSize;
+            return this;
+        }
+
+        Builder withReadOnly() {
+            this.readOnly = true;
+            return this;
+        }
+
+        public TarFiles build() throws IOException {
+            checkState(directory != null, "Directory not specified");
+            checkState(tarRecovery != null, "TAR recovery strategy not 
specified");
+            checkState(ioMonitor != null, "I/O monitor not specified");
+            checkState(readOnly || fileStoreStats != null, "File store 
statistics not specified");
+            checkState(readOnly || maxFileSize != 0, "Max file size not 
specified");
+            return new TarFiles(this);
+        }
+
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(TarFiles.class);
+
+    private static final Pattern FILE_NAME_PATTERN = 
Pattern.compile("(data)((0|[1-9][0-9]*)[0-9]{4})([a-z])?.tar");
+
+    private static Map<Integer, Map<Character, File>> collectFiles(File 
directory) {
+        Map<Integer, Map<Character, File>> dataFiles = newHashMap();
+        for (File file : listFiles(directory, null, false)) {
+            Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName());
+            if (matcher.matches()) {
+                Integer index = Integer.parseInt(matcher.group(2));
+                Map<Character, File> files = dataFiles.get(index);
+                if (files == null) {
+                    files = newHashMap();
+                    dataFiles.put(index, files);
+                }
+                Character generation = 'a';
+                if (matcher.group(4) != null) {
+                    generation = matcher.group(4).charAt(0);
+                }
+                checkState(files.put(generation, file) == null);
+            }
+        }
+        return dataFiles;
+    }
+
+    /**
+     * Include the ids of all segments transitively reachable through forward
+     * references from {@code referencedIds}. See OAK-3864.
+     */
+    private static void includeForwardReferences(Iterable<TarReader> readers, 
Set<UUID> referencedIds) throws IOException {
+        Set<UUID> fRefs = newHashSet(referencedIds);
+        do {
+            // Add direct forward references
+            for (TarReader reader : readers) {
+                reader.calculateForwardReferences(fRefs);
+                if (fRefs.isEmpty()) {
+                    break; // Optimisation: bail out if no references left
+                }
+            }
+            // ... as long as new forward references are found.
+        } while (referencedIds.addAll(fRefs));
+    }
+
+    static Builder builder() {
+        return new Builder();
+    }
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private final long maxFileSize;
+
+    private final boolean memoryMapping;
+
+    private final IOMonitor ioMonitor;
+
+    private final boolean readOnly;
+
+    private List<TarReader> readers;
+
+    private TarWriter writer;
+
+    private volatile boolean shutdown;
+
+    private boolean closed;
+
+    private TarFiles(Builder builder) throws IOException {
+        maxFileSize = builder.maxFileSize;
+        memoryMapping = builder.memoryMapping;
+        ioMonitor = builder.ioMonitor;
+        readOnly = builder.readOnly;
+        Map<Integer, Map<Character, File>> map = 
collectFiles(builder.directory);
+        readers = newArrayListWithCapacity(map.size());
+        Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
+        Arrays.sort(indices);
+        for (int i = indices.length - 1; i >= 0; i--) {
+            if (readOnly) {
+                readers.add(TarReader.openRO(map.get(indices[i]), 
memoryMapping, true, builder.tarRecovery, ioMonitor));
+            } else {
+                readers.add(TarReader.open(map.get(indices[i]), memoryMapping, 
builder.tarRecovery, ioMonitor));
+            }
+        }
+        if (!readOnly) {
+            int writeNumber = 0;
+            if (indices.length > 0) {
+                writeNumber = indices[indices.length - 1] + 1;
+            }
+            writer = new TarWriter(builder.directory, builder.fileStoreStats, 
writeNumber, builder.ioMonitor);
+        }
+    }
+
+    private void checkOpen() {
+        checkState(!closed, "This instance has been closed");
+    }
+
+    private void checkReadWrite() {
+        checkState(!readOnly, "This instance is read-only");
+    }
+
+    @Override
+    public void close() throws IOException {
+        shutdown = true;
+        lock.writeLock().lock();
+        try {
+            checkOpen();
+            closed = true;
+            Closer closer = Closer.create();
+            closer.register(writer);
+            writer = null;
+            for (TarReader reader : readers) {
+                closer.register(reader);
+            }
+            readers = null;
+            closer.close();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public String toString() {
+        lock.readLock().lock();
+        try {
+            return "TarFiles{readers=" + readers + ", writer=" + writer + "}";
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    long size() {
+        lock.readLock().lock();
+        try {
+            checkOpen();
+            long size = 0;
+            if (!readOnly) {
+                size = writer.fileLength();
+            }
+            for (TarReader reader : readers) {
+                size += reader.size();
+            }
+            return size;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    int readerCount() {
+        lock.readLock().lock();
+        try {
+            checkOpen();
+            return readers.size();
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    void flush() throws IOException {
+        checkReadWrite();
+        lock.readLock().lock();
+        try {
+            checkOpen();
+            writer.flush();
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    boolean containsSegment(long msb, long lsb) {
+        lock.readLock().lock();
+        try {
+            checkOpen();
+            if (!readOnly) {
+                if (writer.containsEntry(msb, lsb)) {
+                    return true;
+                }
+            }
+            for (TarReader reader : readers) {
+                if (reader.containsEntry(msb, lsb)) {
+                    return true;
+                }
+            }
+            return false;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    ByteBuffer readSegment(long msb, long lsb) {
+        lock.readLock().lock();
+        try {
+            checkOpen();
+            try {
+                if (!readOnly) {
+                    ByteBuffer buffer = writer.readEntry(msb, lsb);
+                    if (buffer != null) {
+                        return buffer;
+                    }
+                }
+                for (TarReader reader : readers) {
+                    ByteBuffer buffer = reader.readEntry(msb, lsb);
+                    if (buffer != null) {
+                        return buffer;
+                    }
+                }
+            } catch (IOException e) {
+                log.warn("Unable to read from TAR file {}", writer, e);
+            }
+            return null;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    void writeSegment(UUID id, byte[] buffer, int offset, int length, int 
generation, Set<UUID> references, Set<String> binaryReferences) throws 
IOException {
+        checkReadWrite();
+        lock.writeLock().lock();
+        try {
+            checkOpen();
+            long size = writer.writeEntry(
+                    id.getMostSignificantBits(),
+                    id.getLeastSignificantBits(),
+                    buffer,
+                    offset,
+                    length,
+                    generation
+            );
+            if (references != null) {
+                for (UUID reference : references) {
+                    writer.addGraphEdge(id, reference);
+                }
+            }
+            if (binaryReferences != null) {
+                for (String reference : binaryReferences) {
+                    writer.addBinaryReference(generation, id, reference);
+                }
+            }
+            if (size >= maxFileSize) {
+                newWriter();
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void newWriter() throws IOException {
+        TarWriter newWriter = writer.createNextGeneration();
+        if (newWriter == writer) {
+            return;
+        }
+        File writeFile = writer.getFile();
+        List<TarReader> list = newArrayListWithCapacity(1 + readers.size());
+        list.add(TarReader.open(writeFile, memoryMapping, ioMonitor));
+        list.addAll(readers);
+        readers = list;
+        writer = newWriter;
+    }
+
+    CleanupResult cleanup(Set<UUID> references, Predicate<Integer> 
reclaimGeneration) throws IOException {
+        checkReadWrite();
+
+        CleanupResult result = new CleanupResult();
+        result.removableFiles = new ArrayList<>();
+        result.reclaimedSegmentIds = new HashSet<>();
+
+        Map<TarReader, TarReader> cleaned = newLinkedHashMap();
+
+        lock.writeLock().lock();
+        lock.readLock().lock();
+        try {
+            try {
+                checkOpen();
+                newWriter();
+            } finally {
+                lock.writeLock().unlock();
+            }
+
+            // At this point the write lock is downgraded to a read lock for
+            // better concurrency. It is always necessary to access TarReader
+            // and TarWriter instances while holding a lock (either in read or
+            // write mode) to prevent a concurrent #close(). In this case, we
+            // don't need an exclusive access to the TarReader instances.
+
+            // TODO now that the two protected sections have been merged thanks
+            // to lock downgrading, check if the following code can be
+            // simplified.
+
+            for (TarReader reader : readers) {
+                cleaned.put(reader, reader);
+                result.reclaimedSize += reader.size();
+            }
+            Set<UUID> reclaim = newHashSet();
+            for (TarReader reader : cleaned.keySet()) {
+                if (shutdown) {
+                    result.interrupted = true;
+                    return result;
+                }
+                reader.mark(references, reclaim, reclaimGeneration);
+                log.info("{}: size of bulk references/reclaim set {}/{}", 
reader, references.size(), reclaim.size());
+            }
+            for (TarReader reader : cleaned.keySet()) {
+                if (shutdown) {
+                    result.interrupted = true;
+                    return result;
+                }
+                cleaned.put(reader, reader.sweep(reclaim, 
result.reclaimedSegmentIds));
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        List<TarReader> oldReaders = newArrayList();
+        lock.writeLock().lock();
+        try {
+            // Replace current list of reader with the cleaned readers taking 
care not to lose
+            // any new reader that might have come in through concurrent calls 
to newWriter()
+            checkOpen();
+            List<TarReader> sweptReaders = newArrayList();
+            for (TarReader reader : readers) {
+                if (cleaned.containsKey(reader)) {
+                    TarReader newReader = cleaned.get(reader);
+                    if (newReader != null) {
+                        sweptReaders.add(newReader);
+                        result.reclaimedSize -= newReader.size();
+                    }
+                    // if these two differ, the former represents the swept 
version of the latter
+                    if (newReader != reader) {
+                        oldReaders.add(reader);
+                    }
+                } else {
+                    sweptReaders.add(reader);
+                }
+            }
+            readers = sweptReaders;
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        for (TarReader oldReader : oldReaders) {
+            try {
+                oldReader.close();
+            } catch (IOException e) {
+                log.error("Unable to close swept TAR reader", e);
+            }
+            result.removableFiles.add(oldReader.getFile());
+        }
+
+        return result;
+    }
+
+    void collectBlobReferences(ReferenceCollector collector, int 
minGeneration) throws IOException {
+        lock.writeLock().lock();
+        lock.readLock().lock();
+        try {
+            try {
+                checkOpen();
+                if (!readOnly) {
+                    newWriter();
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
+
+            // At this point the write lock is downgraded to a read lock for
+            // better concurrency. It is always necessary to access TarReader
+            // and TarWriter instances while holding a lock (either in read or
+            // write mode) to prevent a concurrent #close(). In this case, we
+            // don't need an exclusive access to the TarReader instances.
+
+            for (TarReader reader : readers) {
+                reader.collectBlobReferences(collector, minGeneration);
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    Iterable<UUID> getSegmentIds() {
+        lock.readLock().lock();
+        try {
+            checkOpen();
+            List<UUID> ids = new ArrayList<>();
+            for (TarReader reader : readers) {
+                ids.addAll(reader.getUUIDs());
+            }
+            return ids;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    Map<UUID, List<UUID>> getGraph(String fileName) throws IOException {
+        Set<UUID> index = null;
+        Map<UUID, List<UUID>> graph = null;
+
+        lock.readLock().lock();
+        try {
+            checkOpen();
+            for (TarReader reader : readers) {
+                if (fileName.equals(reader.getFile().getName())) {
+                    index = reader.getUUIDs();
+                    graph = reader.getGraph(false);
+                    break;
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        Map<UUID, List<UUID>> result = new HashMap<>();
+        if (index != null) {
+            for (UUID uuid : index) {
+                result.put(uuid, null);
+            }
+        }
+        if (graph != null) {
+            result.putAll(graph);
+        }
+        return result;
+    }
+
+    Map<String, Set<UUID>> getIndices() {
+        lock.readLock().lock();
+        try {
+            checkOpen();
+            Map<String, Set<UUID>> index = new HashMap<>();
+            for (TarReader reader : readers) {
+                index.put(reader.getFile().getAbsolutePath(), 
reader.getUUIDs());
+            }
+            return index;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    void traverseSegmentGraph(Set<UUID> roots, SegmentGraphVisitor visitor) 
throws IOException {
+        lock.readLock().lock();
+        try {
+            checkOpen();
+            includeForwardReferences(readers, roots);
+            for (TarReader reader : readers) {
+                reader.traverseSegmentGraph(roots, visitor);
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to