Author: frm
Date: Mon Apr 10 15:01:00 2017
New Revision: 1790844

URL: http://svn.apache.org/viewvc?rev=1790844&view=rev
Log:
OAK-6052 - Exploit immutability of TAR readers to avoid unnecessary locking

Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java?rev=1790844&r1=1790843&r2=1790844&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
 Mon Apr 10 15:01:00 2017
@@ -20,10 +20,7 @@ package org.apache.jackrabbit.oak.segmen
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
 import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Maps.newLinkedHashMap;
 import static com.google.common.collect.Sets.newHashSet;
 import static org.apache.commons.io.FileUtils.listFiles;
 
@@ -35,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,7 +44,6 @@ import java.util.regex.Pattern;
 
 import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
-import com.google.common.io.Closer;
 import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
 import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
 import org.slf4j.Logger;
@@ -54,6 +51,18 @@ import org.slf4j.LoggerFactory;
 
 class TarFiles implements Closeable {
 
+    private static class Node {
+
+        final TarReader reader;
+
+        final Node next;
+
+        Node(TarReader reader, Node next) {
+            this.reader = reader;
+            this.next = next;
+        }
+    }
+
     static class CleanupResult {
 
         private boolean interrupted;
@@ -153,6 +162,15 @@ class TarFiles implements Closeable {
 
     private static final Pattern FILE_NAME_PATTERN = 
Pattern.compile("(data)((0|[1-9][0-9]*)[0-9]{4})([a-z])?.tar");
 
+    private static Node reverse(Node n) {
+        Node r = null;
+        while (n != null) {
+            r = new Node(n.reader, r);
+            n = n.next;
+        }
+        return r;
+    }
+
     private static Map<Integer, Map<Character, File>> collectFiles(File 
directory) {
         Map<Integer, Map<Character, File>> dataFiles = newHashMap();
         for (File file : listFiles(directory, null, false)) {
@@ -174,141 +192,175 @@ class TarFiles implements Closeable {
         return dataFiles;
     }
 
-    /**
-     * Include the ids of all segments transitively reachable through forward
-     * references from {@code referencedIds}. See OAK-3864.
-     */
-    private static void includeForwardReferences(Iterable<TarReader> readers, 
Set<UUID> referencedIds) throws IOException {
-        Set<UUID> fRefs = newHashSet(referencedIds);
+    private static void includeForwardReferences(Node head, Set<UUID> 
referencedIds) throws IOException {
+        Set<UUID> references = newHashSet(referencedIds);
         do {
             // Add direct forward references
-            for (TarReader reader : readers) {
-                reader.calculateForwardReferences(fRefs);
-                if (fRefs.isEmpty()) {
+            Node n = head;
+            while (n != null) {
+                n.reader.calculateForwardReferences(references);
+                if (references.isEmpty()) {
                     break; // Optimisation: bail out if no references left
                 }
+                n = n.next;
             }
             // ... as long as new forward references are found.
-        } while (referencedIds.addAll(fRefs));
+        } while (referencedIds.addAll(references));
     }
 
     static Builder builder() {
         return new Builder();
     }
 
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
     private final long maxFileSize;
 
     private final boolean memoryMapping;
 
     private final IOMonitor ioMonitor;
 
-    private final boolean readOnly;
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private List<TarReader> readers;
+    private Node readers;
 
     private TarWriter writer;
 
     private volatile boolean shutdown;
 
-    private boolean closed;
-
     private TarFiles(Builder builder) throws IOException {
         maxFileSize = builder.maxFileSize;
         memoryMapping = builder.memoryMapping;
         ioMonitor = builder.ioMonitor;
-        readOnly = builder.readOnly;
         Map<Integer, Map<Character, File>> map = 
collectFiles(builder.directory);
-        readers = newArrayListWithCapacity(map.size());
         Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
         Arrays.sort(indices);
-        for (int i = indices.length - 1; i >= 0; i--) {
-            if (readOnly) {
-                readers.add(TarReader.openRO(map.get(indices[i]), 
memoryMapping, true, builder.tarRecovery, ioMonitor));
+        for (Integer index : indices) {
+            TarReader r;
+            if (builder.readOnly) {
+                r = TarReader.openRO(map.get(index), memoryMapping, true, 
builder.tarRecovery, ioMonitor);
             } else {
-                readers.add(TarReader.open(map.get(indices[i]), memoryMapping, 
builder.tarRecovery, ioMonitor));
+                r = TarReader.open(map.get(index), memoryMapping, 
builder.tarRecovery, ioMonitor);
             }
+            readers = new Node(r, readers);
         }
-        if (!readOnly) {
-            int writeNumber = 0;
-            if (indices.length > 0) {
-                writeNumber = indices[indices.length - 1] + 1;
-            }
-            writer = new TarWriter(builder.directory, builder.fileStoreStats, 
writeNumber, builder.ioMonitor);
+        if (builder.readOnly) {
+            return;
         }
-    }
-
-    private void checkOpen() {
-        checkState(!closed, "This instance has been closed");
-    }
-
-    private void checkReadWrite() {
-        checkState(!readOnly, "This instance is read-only");
+        int writeNumber = 0;
+        if (indices.length > 0) {
+            writeNumber = indices[indices.length - 1] + 1;
+        }
+        writer = new TarWriter(builder.directory, builder.fileStoreStats, 
writeNumber, builder.ioMonitor);
     }
 
     @Override
     public void close() throws IOException {
         shutdown = true;
+
+        TarWriter writer;
+        Node n;
+
         lock.writeLock().lock();
         try {
-            checkOpen();
-            closed = true;
-            Closer closer = Closer.create();
-            closer.register(writer);
-            writer = null;
-            for (TarReader reader : readers) {
-                closer.register(reader);
-            }
-            readers = null;
-            closer.close();
+            writer = this.writer;
+            n = this.readers;
         } finally {
             lock.writeLock().unlock();
         }
+
+        IOException exception = null;
+
+        if (writer != null) {
+            try {
+                writer.close();
+            } catch (IOException e) {
+                exception = e;
+            }
+        }
+
+        while (n != null) {
+            try {
+                n.reader.close();
+            } catch (IOException e) {
+                if (exception == null) {
+                    exception = e;
+                } else {
+                    exception.addSuppressed(e);
+                }
+            }
+            n = n.next;
+        }
+
+        if (exception != null) {
+            throw exception;
+        }
     }
 
     @Override
     public String toString() {
+        String w = null;
+        Node n;
+
         lock.readLock().lock();
         try {
-            return "TarFiles{readers=" + readers + ", writer=" + writer + "}";
+            if (writer != null) {
+                w = writer.toString();
+            }
+            n = readers;
         } finally {
             lock.readLock().unlock();
         }
+
+        List<TarReader> rs = new ArrayList<>();
+        while (n != null) {
+            rs.add(n.reader);
+            n = n.next;
+        }
+        return String.format("TarFiles{readers=%s,writer=%s", rs, w);
     }
 
     long size() {
+        long size = 0;
+        Node readers;
+
         lock.readLock().lock();
         try {
-            checkOpen();
-            long size = 0;
-            if (!readOnly) {
+            if (writer != null) {
                 size = writer.fileLength();
             }
-            for (TarReader reader : readers) {
-                size += reader.size();
-            }
-            return size;
+            readers = this.readers;
         } finally {
             lock.readLock().unlock();
         }
+
+        Node n = readers;
+        while (n != null) {
+            size += n.reader.size();
+            n = n.next;
+        }
+        return size;
     }
 
     int readerCount() {
+        Node n;
+
         lock.readLock().lock();
         try {
-            checkOpen();
-            return readers.size();
+            n = readers;
         } finally {
             lock.readLock().unlock();
         }
+
+        int count = 0;
+        while (n != null) {
+            count++;
+            n = n.next;
+        }
+        return count;
     }
 
     void flush() throws IOException {
-        checkReadWrite();
         lock.readLock().lock();
         try {
-            checkOpen();
             writer.flush();
         } finally {
             lock.readLock().unlock();
@@ -316,56 +368,63 @@ class TarFiles implements Closeable {
     }
 
     boolean containsSegment(long msb, long lsb) {
+        Node n;
+
         lock.readLock().lock();
         try {
-            checkOpen();
-            if (!readOnly) {
+            if (writer != null) {
                 if (writer.containsEntry(msb, lsb)) {
                     return true;
                 }
             }
-            for (TarReader reader : readers) {
-                if (reader.containsEntry(msb, lsb)) {
-                    return true;
-                }
-            }
-            return false;
+            n = this.readers;
         } finally {
             lock.readLock().unlock();
         }
+
+        while (n != null) {
+            if (n.reader.containsEntry(msb, lsb)) {
+                return true;
+            }
+            n = n.next;
+        }
+        return false;
     }
 
     ByteBuffer readSegment(long msb, long lsb) {
-        lock.readLock().lock();
         try {
-            checkOpen();
+            Node n;
+
+            lock.readLock().lock();
             try {
-                if (!readOnly) {
-                    ByteBuffer buffer = writer.readEntry(msb, lsb);
-                    if (buffer != null) {
-                        return buffer;
+                if (writer != null) {
+                    ByteBuffer b = writer.readEntry(msb, lsb);
+                    if (b != null) {
+                        return b;
                     }
                 }
-                for (TarReader reader : readers) {
-                    ByteBuffer buffer = reader.readEntry(msb, lsb);
-                    if (buffer != null) {
-                        return buffer;
-                    }
+                n = readers;
+            } finally {
+                lock.readLock().unlock();
+            }
+
+            while (n != null) {
+                ByteBuffer b = n.reader.readEntry(msb, lsb);
+                if (b != null) {
+                    return b;
                 }
-            } catch (IOException e) {
-                log.warn("Unable to read from TAR file {}", writer, e);
+                n = n.next;
             }
-            return null;
-        } finally {
-            lock.readLock().unlock();
+        } catch (IOException e) {
+            log.warn("Unable to read from TAR file", e);
         }
+
+        return null;
     }
 
     void writeSegment(UUID id, byte[] buffer, int offset, int length, int 
generation, Set<UUID> references, Set<String> binaryReferences) throws 
IOException {
-        checkReadWrite();
         lock.writeLock().lock();
         try {
-            checkOpen();
             long size = writer.writeEntry(
                     id.getMostSignificantBits(),
                     id.getLeastSignificantBits(),
@@ -397,172 +456,179 @@ class TarFiles implements Closeable {
         if (newWriter == writer) {
             return;
         }
-        File writeFile = writer.getFile();
-        List<TarReader> list = newArrayListWithCapacity(1 + readers.size());
-        list.add(TarReader.open(writeFile, memoryMapping, ioMonitor));
-        list.addAll(readers);
-        readers = list;
+        readers = new Node(TarReader.open(writer.getFile(), memoryMapping, 
ioMonitor), readers);
         writer = newWriter;
     }
 
-    CleanupResult cleanup(Supplier<Set<UUID>> referencesSupplier, 
Predicate<Integer> reclaimGeneration) throws IOException {
-        checkReadWrite();
-
+    CleanupResult cleanup(Supplier<Set<UUID>> referencesSupplier, 
Predicate<Integer> reclaimPredicate) throws IOException {
         CleanupResult result = new CleanupResult();
         result.removableFiles = new ArrayList<>();
         result.reclaimedSegmentIds = new HashSet<>();
 
-        Map<TarReader, TarReader> cleaned = newLinkedHashMap();
+        Set<UUID> references;
+        Node readers;
 
         lock.writeLock().lock();
         lock.readLock().lock();
+        ;
         try {
             try {
-                checkOpen();
                 newWriter();
             } finally {
                 lock.writeLock().unlock();
             }
-
-            // At this point the write lock is downgraded to a read lock for
-            // better concurrency. It is always necessary to access TarReader
-            // and TarWriter instances while holding a lock (either in read or
-            // write mode) to prevent a concurrent #close(). In this case, we
-            // don't need an exclusive access to the TarReader instances.
-
-            // TODO now that the two protected sections have been merged thanks
-            // to lock downgrading, check if the following code can be
-            // simplified.
-
-            for (TarReader reader : readers) {
-                cleaned.put(reader, reader);
-                result.reclaimedSize += reader.size();
-            }
-
-            // The set of references has to be computed while holding the lock.
-            // This prevents a time-of-check to time-of-use race condition. See
-            // OAK-6046 for further details.
-
-            Set<UUID> references = referencesSupplier.get();
-
-            Set<UUID> reclaim = newHashSet();
-            for (TarReader reader : cleaned.keySet()) {
-                if (shutdown) {
-                    result.interrupted = true;
-                    return result;
-                }
-                reader.mark(references, reclaim, reclaimGeneration);
-                log.info("{}: size of bulk references/reclaim set {}/{}", 
reader, references.size(), reclaim.size());
-            }
-            for (TarReader reader : cleaned.keySet()) {
-                if (shutdown) {
-                    result.interrupted = true;
-                    return result;
-                }
-                cleaned.put(reader, reader.sweep(reclaim, 
result.reclaimedSegmentIds));
-            }
+            readers = this.readers;
+            references = referencesSupplier.get();
         } finally {
             lock.readLock().unlock();
         }
 
-        List<TarReader> oldReaders = newArrayList();
-        lock.writeLock().lock();
-        try {
-            // Replace current list of reader with the cleaned readers taking 
care not to lose
-            // any new reader that might have come in through concurrent calls 
to newWriter()
-            checkOpen();
-            List<TarReader> sweptReaders = newArrayList();
-            for (TarReader reader : readers) {
-                if (cleaned.containsKey(reader)) {
-                    TarReader newReader = cleaned.get(reader);
-                    if (newReader != null) {
-                        sweptReaders.add(newReader);
-                        result.reclaimedSize -= newReader.size();
+        Map<TarReader, TarReader> cleaned = new LinkedHashMap<>();
+
+        {
+            Node n = readers;
+            while (n != null) {
+                cleaned.put(n.reader, n.reader);
+                result.reclaimedSize += n.reader.size();
+                n = n.next;
+            }
+        }
+
+        Set<UUID> reclaim = newHashSet();
+
+        for (TarReader reader : cleaned.keySet()) {
+            if (shutdown) {
+                result.interrupted = true;
+                return result;
+            }
+            reader.mark(references, reclaim, reclaimPredicate);
+        }
+
+        for (TarReader reader : cleaned.keySet()) {
+            if (shutdown) {
+                result.interrupted = true;
+                return result;
+            }
+            cleaned.put(reader, reader.sweep(reclaim, 
result.reclaimedSegmentIds));
+        }
+
+        Node closeables;
+        long reclaimed;
+
+        while (true) {
+            closeables = null;
+            reclaimed = 0;
+
+            Node swept = null;
+            Node n = readers;
+            while (n != null) {
+                if (cleaned.containsKey(n.reader)) {
+                    TarReader r = cleaned.get(n.reader);
+                    if (r != null) {
+                        swept = new Node(r, swept);
+                        reclaimed += r.size();
                     }
-                    // if these two differ, the former represents the swept 
version of the latter
-                    if (newReader != reader) {
-                        oldReaders.add(reader);
+                    if (r != n.reader) {
+                        closeables = new Node(n.reader, closeables);
                     }
                 } else {
-                    sweptReaders.add(reader);
+                    swept = new Node(n.reader, swept);
                 }
+                n = n.next;
             }
-            readers = sweptReaders;
-        } finally {
-            lock.writeLock().unlock();
-        }
+            swept = reverse(swept);
 
-        for (TarReader oldReader : oldReaders) {
+            lock.writeLock().lock();
             try {
-                oldReader.close();
-            } catch (IOException e) {
-                log.error("Unable to close swept TAR reader", e);
+                if (this.readers == readers) {
+                    this.readers = swept;
+                    break;
+                } else {
+                    readers = this.readers;
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        result.reclaimedSize -= reclaimed;
+
+        {
+            Node n = closeables;
+            while (n != null) {
+                try {
+                    n.reader.close();
+                } catch (IOException e) {
+                    log.warn("Unable to close swept TAR reader", e);
+                }
+                result.removableFiles.add(n.reader.getFile());
+                n = n.next;
             }
-            result.removableFiles.add(oldReader.getFile());
         }
 
         return result;
     }
 
     void collectBlobReferences(ReferenceCollector collector, int 
minGeneration) throws IOException {
+        Node n;
+
         lock.writeLock().lock();
-        lock.readLock().lock();
         try {
-            try {
-                checkOpen();
-                if (!readOnly) {
-                    newWriter();
-                }
-            } finally {
-                lock.writeLock().unlock();
-            }
-
-            // At this point the write lock is downgraded to a read lock for
-            // better concurrency. It is always necessary to access TarReader
-            // and TarWriter instances while holding a lock (either in read or
-            // write mode) to prevent a concurrent #close(). In this case, we
-            // don't need an exclusive access to the TarReader instances.
-
-            for (TarReader reader : readers) {
-                reader.collectBlobReferences(collector, minGeneration);
+            if (writer != null) {
+                newWriter();
             }
+            n = readers;
         } finally {
-            lock.readLock().unlock();
+            lock.writeLock().unlock();
+        }
+
+        while (n != null) {
+            n.reader.collectBlobReferences(collector, minGeneration);
+            n = n.next;
         }
     }
 
     Iterable<UUID> getSegmentIds() {
+        Node n;
+
         lock.readLock().lock();
         try {
-            checkOpen();
-            List<UUID> ids = new ArrayList<>();
-            for (TarReader reader : readers) {
-                ids.addAll(reader.getUUIDs());
-            }
-            return ids;
+            n = readers;
         } finally {
             lock.readLock().unlock();
         }
+
+        List<UUID> ids = new ArrayList<>();
+        while (n != null) {
+            ids.addAll(n.reader.getUUIDs());
+            n = n.next;
+        }
+        return ids;
     }
 
     Map<UUID, List<UUID>> getGraph(String fileName) throws IOException {
-        Set<UUID> index = null;
-        Map<UUID, List<UUID>> graph = null;
+        Node n;
 
         lock.readLock().lock();
         try {
-            checkOpen();
-            for (TarReader reader : readers) {
-                if (fileName.equals(reader.getFile().getName())) {
-                    index = reader.getUUIDs();
-                    graph = reader.getGraph(false);
-                    break;
-                }
-            }
+            n = readers;
         } finally {
             lock.readLock().unlock();
         }
 
+        Set<UUID> index = null;
+        Map<UUID, List<UUID>> graph = null;
+
+        while (n != null) {
+            TarReader r = n.reader;
+            if (fileName.equals(r.getFile().getName())) {
+                index = r.getUUIDs();
+                graph = r.getGraph(false);
+                break;
+            }
+            n = n.next;
+        }
+
         Map<UUID, List<UUID>> result = new HashMap<>();
         if (index != null) {
             for (UUID uuid : index) {
@@ -576,30 +642,38 @@ class TarFiles implements Closeable {
     }
 
     Map<String, Set<UUID>> getIndices() {
+        Node n;
+
         lock.readLock().lock();
         try {
-            checkOpen();
-            Map<String, Set<UUID>> index = new HashMap<>();
-            for (TarReader reader : readers) {
-                index.put(reader.getFile().getAbsolutePath(), 
reader.getUUIDs());
-            }
-            return index;
+            n = readers;
         } finally {
             lock.readLock().unlock();
         }
+
+        Map<String, Set<UUID>> index = new HashMap<>();
+        while (n != null) {
+            index.put(n.reader.getFile().getAbsolutePath(), 
n.reader.getUUIDs());
+            n = n.next;
+        }
+        return index;
     }
 
     void traverseSegmentGraph(Set<UUID> roots, SegmentGraphVisitor visitor) 
throws IOException {
+        Node n;
+
         lock.readLock().lock();
         try {
-            checkOpen();
-            includeForwardReferences(readers, roots);
-            for (TarReader reader : readers) {
-                reader.traverseSegmentGraph(roots, visitor);
-            }
+            n = readers;
         } finally {
             lock.readLock().unlock();
         }
+
+        includeForwardReferences(n, roots);
+        while (n != null) {
+            n.reader.traverseSegmentGraph(roots, visitor);
+            n = n.next;
+        }
     }
 
 }


Reply via email to