Author: frm
Date: Mon Apr 10 15:01:00 2017
New Revision: 1790844
URL: http://svn.apache.org/viewvc?rev=1790844&view=rev
Log:
OAK-6052 - Exploit immutability of TAR readers to avoid unnecessary locking
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java?rev=1790844&r1=1790843&r2=1790844&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
Mon Apr 10 15:01:00 2017
@@ -20,10 +20,7 @@ package org.apache.jackrabbit.oak.segmen
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Maps.newLinkedHashMap;
import static com.google.common.collect.Sets.newHashSet;
import static org.apache.commons.io.FileUtils.listFiles;
@@ -35,6 +32,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,7 +44,6 @@ import java.util.regex.Pattern;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
-import com.google.common.io.Closer;
import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
import org.slf4j.Logger;
@@ -54,6 +51,18 @@ import org.slf4j.LoggerFactory;
class TarFiles implements Closeable {
+ private static class Node {
+
+ final TarReader reader;
+
+ final Node next;
+
+ Node(TarReader reader, Node next) {
+ this.reader = reader;
+ this.next = next;
+ }
+ }
+
static class CleanupResult {
private boolean interrupted;
@@ -153,6 +162,15 @@ class TarFiles implements Closeable {
private static final Pattern FILE_NAME_PATTERN =
Pattern.compile("(data)((0|[1-9][0-9]*)[0-9]{4})([a-z])?.tar");
+ private static Node reverse(Node n) {
+ Node r = null;
+ while (n != null) {
+ r = new Node(n.reader, r);
+ n = n.next;
+ }
+ return r;
+ }
+
private static Map<Integer, Map<Character, File>> collectFiles(File
directory) {
Map<Integer, Map<Character, File>> dataFiles = newHashMap();
for (File file : listFiles(directory, null, false)) {
@@ -174,141 +192,175 @@ class TarFiles implements Closeable {
return dataFiles;
}
- /**
- * Include the ids of all segments transitively reachable through forward
- * references from {@code referencedIds}. See OAK-3864.
- */
- private static void includeForwardReferences(Iterable<TarReader> readers,
Set<UUID> referencedIds) throws IOException {
- Set<UUID> fRefs = newHashSet(referencedIds);
+ private static void includeForwardReferences(Node head, Set<UUID>
referencedIds) throws IOException {
+ Set<UUID> references = newHashSet(referencedIds);
do {
// Add direct forward references
- for (TarReader reader : readers) {
- reader.calculateForwardReferences(fRefs);
- if (fRefs.isEmpty()) {
+ Node n = head;
+ while (n != null) {
+ n.reader.calculateForwardReferences(references);
+ if (references.isEmpty()) {
break; // Optimisation: bail out if no references left
}
+ n = n.next;
}
// ... as long as new forward references are found.
- } while (referencedIds.addAll(fRefs));
+ } while (referencedIds.addAll(references));
}
static Builder builder() {
return new Builder();
}
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
private final long maxFileSize;
private final boolean memoryMapping;
private final IOMonitor ioMonitor;
- private final boolean readOnly;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private List<TarReader> readers;
+ private Node readers;
private TarWriter writer;
private volatile boolean shutdown;
- private boolean closed;
-
private TarFiles(Builder builder) throws IOException {
maxFileSize = builder.maxFileSize;
memoryMapping = builder.memoryMapping;
ioMonitor = builder.ioMonitor;
- readOnly = builder.readOnly;
Map<Integer, Map<Character, File>> map =
collectFiles(builder.directory);
- readers = newArrayListWithCapacity(map.size());
Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
Arrays.sort(indices);
- for (int i = indices.length - 1; i >= 0; i--) {
- if (readOnly) {
- readers.add(TarReader.openRO(map.get(indices[i]),
memoryMapping, true, builder.tarRecovery, ioMonitor));
+ for (Integer index : indices) {
+ TarReader r;
+ if (builder.readOnly) {
+ r = TarReader.openRO(map.get(index), memoryMapping, true,
builder.tarRecovery, ioMonitor);
} else {
- readers.add(TarReader.open(map.get(indices[i]), memoryMapping,
builder.tarRecovery, ioMonitor));
+ r = TarReader.open(map.get(index), memoryMapping,
builder.tarRecovery, ioMonitor);
}
+ readers = new Node(r, readers);
}
- if (!readOnly) {
- int writeNumber = 0;
- if (indices.length > 0) {
- writeNumber = indices[indices.length - 1] + 1;
- }
- writer = new TarWriter(builder.directory, builder.fileStoreStats,
writeNumber, builder.ioMonitor);
+ if (builder.readOnly) {
+ return;
}
- }
-
- private void checkOpen() {
- checkState(!closed, "This instance has been closed");
- }
-
- private void checkReadWrite() {
- checkState(!readOnly, "This instance is read-only");
+ int writeNumber = 0;
+ if (indices.length > 0) {
+ writeNumber = indices[indices.length - 1] + 1;
+ }
+ writer = new TarWriter(builder.directory, builder.fileStoreStats,
writeNumber, builder.ioMonitor);
}
@Override
public void close() throws IOException {
shutdown = true;
+
+ TarWriter writer;
+ Node n;
+
lock.writeLock().lock();
try {
- checkOpen();
- closed = true;
- Closer closer = Closer.create();
- closer.register(writer);
- writer = null;
- for (TarReader reader : readers) {
- closer.register(reader);
- }
- readers = null;
- closer.close();
+ writer = this.writer;
+ n = this.readers;
} finally {
lock.writeLock().unlock();
}
+
+ IOException exception = null;
+
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ exception = e;
+ }
+ }
+
+ while (n != null) {
+ try {
+ n.reader.close();
+ } catch (IOException e) {
+ if (exception == null) {
+ exception = e;
+ } else {
+ exception.addSuppressed(e);
+ }
+ }
+ n = n.next;
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
}
@Override
public String toString() {
+ String w = null;
+ Node n;
+
lock.readLock().lock();
try {
- return "TarFiles{readers=" + readers + ", writer=" + writer + "}";
+ if (writer != null) {
+ w = writer.toString();
+ }
+ n = readers;
} finally {
lock.readLock().unlock();
}
+
+ List<TarReader> rs = new ArrayList<>();
+ while (n != null) {
+ rs.add(n.reader);
+ n = n.next;
+ }
+ return String.format("TarFiles{readers=%s,writer=%s", rs, w);
}
long size() {
+ long size = 0;
+ Node readers;
+
lock.readLock().lock();
try {
- checkOpen();
- long size = 0;
- if (!readOnly) {
+ if (writer != null) {
size = writer.fileLength();
}
- for (TarReader reader : readers) {
- size += reader.size();
- }
- return size;
+ readers = this.readers;
} finally {
lock.readLock().unlock();
}
+
+ Node n = readers;
+ while (n != null) {
+ size += n.reader.size();
+ n = n.next;
+ }
+ return size;
}
int readerCount() {
+ Node n;
+
lock.readLock().lock();
try {
- checkOpen();
- return readers.size();
+ n = readers;
} finally {
lock.readLock().unlock();
}
+
+ int count = 0;
+ while (n != null) {
+ count++;
+ n = n.next;
+ }
+ return count;
}
void flush() throws IOException {
- checkReadWrite();
lock.readLock().lock();
try {
- checkOpen();
writer.flush();
} finally {
lock.readLock().unlock();
@@ -316,56 +368,63 @@ class TarFiles implements Closeable {
}
boolean containsSegment(long msb, long lsb) {
+ Node n;
+
lock.readLock().lock();
try {
- checkOpen();
- if (!readOnly) {
+ if (writer != null) {
if (writer.containsEntry(msb, lsb)) {
return true;
}
}
- for (TarReader reader : readers) {
- if (reader.containsEntry(msb, lsb)) {
- return true;
- }
- }
- return false;
+ n = this.readers;
} finally {
lock.readLock().unlock();
}
+
+ while (n != null) {
+ if (n.reader.containsEntry(msb, lsb)) {
+ return true;
+ }
+ n = n.next;
+ }
+ return false;
}
ByteBuffer readSegment(long msb, long lsb) {
- lock.readLock().lock();
try {
- checkOpen();
+ Node n;
+
+ lock.readLock().lock();
try {
- if (!readOnly) {
- ByteBuffer buffer = writer.readEntry(msb, lsb);
- if (buffer != null) {
- return buffer;
+ if (writer != null) {
+ ByteBuffer b = writer.readEntry(msb, lsb);
+ if (b != null) {
+ return b;
}
}
- for (TarReader reader : readers) {
- ByteBuffer buffer = reader.readEntry(msb, lsb);
- if (buffer != null) {
- return buffer;
- }
+ n = readers;
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ while (n != null) {
+ ByteBuffer b = n.reader.readEntry(msb, lsb);
+ if (b != null) {
+ return b;
}
- } catch (IOException e) {
- log.warn("Unable to read from TAR file {}", writer, e);
+ n = n.next;
}
- return null;
- } finally {
- lock.readLock().unlock();
+ } catch (IOException e) {
+ log.warn("Unable to read from TAR file", e);
}
+
+ return null;
}
void writeSegment(UUID id, byte[] buffer, int offset, int length, int
generation, Set<UUID> references, Set<String> binaryReferences) throws
IOException {
- checkReadWrite();
lock.writeLock().lock();
try {
- checkOpen();
long size = writer.writeEntry(
id.getMostSignificantBits(),
id.getLeastSignificantBits(),
@@ -397,172 +456,179 @@ class TarFiles implements Closeable {
if (newWriter == writer) {
return;
}
- File writeFile = writer.getFile();
- List<TarReader> list = newArrayListWithCapacity(1 + readers.size());
- list.add(TarReader.open(writeFile, memoryMapping, ioMonitor));
- list.addAll(readers);
- readers = list;
+ readers = new Node(TarReader.open(writer.getFile(), memoryMapping,
ioMonitor), readers);
writer = newWriter;
}
- CleanupResult cleanup(Supplier<Set<UUID>> referencesSupplier,
Predicate<Integer> reclaimGeneration) throws IOException {
- checkReadWrite();
-
+ CleanupResult cleanup(Supplier<Set<UUID>> referencesSupplier,
Predicate<Integer> reclaimPredicate) throws IOException {
CleanupResult result = new CleanupResult();
result.removableFiles = new ArrayList<>();
result.reclaimedSegmentIds = new HashSet<>();
- Map<TarReader, TarReader> cleaned = newLinkedHashMap();
+ Set<UUID> references;
+ Node readers;
lock.writeLock().lock();
lock.readLock().lock();
+ ;
try {
try {
- checkOpen();
newWriter();
} finally {
lock.writeLock().unlock();
}
-
- // At this point the write lock is downgraded to a read lock for
- // better concurrency. It is always necessary to access TarReader
- // and TarWriter instances while holding a lock (either in read or
- // write mode) to prevent a concurrent #close(). In this case, we
- // don't need an exclusive access to the TarReader instances.
-
- // TODO now that the two protected sections have been merged thanks
- // to lock downgrading, check if the following code can be
- // simplified.
-
- for (TarReader reader : readers) {
- cleaned.put(reader, reader);
- result.reclaimedSize += reader.size();
- }
-
- // The set of references has to be computed while holding the lock.
- // This prevents a time-of-check to time-of-use race condition. See
- // OAK-6046 for further details.
-
- Set<UUID> references = referencesSupplier.get();
-
- Set<UUID> reclaim = newHashSet();
- for (TarReader reader : cleaned.keySet()) {
- if (shutdown) {
- result.interrupted = true;
- return result;
- }
- reader.mark(references, reclaim, reclaimGeneration);
- log.info("{}: size of bulk references/reclaim set {}/{}",
reader, references.size(), reclaim.size());
- }
- for (TarReader reader : cleaned.keySet()) {
- if (shutdown) {
- result.interrupted = true;
- return result;
- }
- cleaned.put(reader, reader.sweep(reclaim,
result.reclaimedSegmentIds));
- }
+ readers = this.readers;
+ references = referencesSupplier.get();
} finally {
lock.readLock().unlock();
}
- List<TarReader> oldReaders = newArrayList();
- lock.writeLock().lock();
- try {
- // Replace current list of reader with the cleaned readers taking
care not to lose
- // any new reader that might have come in through concurrent calls
to newWriter()
- checkOpen();
- List<TarReader> sweptReaders = newArrayList();
- for (TarReader reader : readers) {
- if (cleaned.containsKey(reader)) {
- TarReader newReader = cleaned.get(reader);
- if (newReader != null) {
- sweptReaders.add(newReader);
- result.reclaimedSize -= newReader.size();
+ Map<TarReader, TarReader> cleaned = new LinkedHashMap<>();
+
+ {
+ Node n = readers;
+ while (n != null) {
+ cleaned.put(n.reader, n.reader);
+ result.reclaimedSize += n.reader.size();
+ n = n.next;
+ }
+ }
+
+ Set<UUID> reclaim = newHashSet();
+
+ for (TarReader reader : cleaned.keySet()) {
+ if (shutdown) {
+ result.interrupted = true;
+ return result;
+ }
+ reader.mark(references, reclaim, reclaimPredicate);
+ }
+
+ for (TarReader reader : cleaned.keySet()) {
+ if (shutdown) {
+ result.interrupted = true;
+ return result;
+ }
+ cleaned.put(reader, reader.sweep(reclaim,
result.reclaimedSegmentIds));
+ }
+
+ Node closeables;
+ long reclaimed;
+
+ while (true) {
+ closeables = null;
+ reclaimed = 0;
+
+ Node swept = null;
+ Node n = readers;
+ while (n != null) {
+ if (cleaned.containsKey(n.reader)) {
+ TarReader r = cleaned.get(n.reader);
+ if (r != null) {
+ swept = new Node(r, swept);
+ reclaimed += r.size();
}
- // if these two differ, the former represents the swept
version of the latter
- if (newReader != reader) {
- oldReaders.add(reader);
+ if (r != n.reader) {
+ closeables = new Node(n.reader, closeables);
}
} else {
- sweptReaders.add(reader);
+ swept = new Node(n.reader, swept);
}
+ n = n.next;
}
- readers = sweptReaders;
- } finally {
- lock.writeLock().unlock();
- }
+ swept = reverse(swept);
- for (TarReader oldReader : oldReaders) {
+ lock.writeLock().lock();
try {
- oldReader.close();
- } catch (IOException e) {
- log.error("Unable to close swept TAR reader", e);
+ if (this.readers == readers) {
+ this.readers = swept;
+ break;
+ } else {
+ readers = this.readers;
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ result.reclaimedSize -= reclaimed;
+
+ {
+ Node n = closeables;
+ while (n != null) {
+ try {
+ n.reader.close();
+ } catch (IOException e) {
+ log.warn("Unable to close swept TAR reader", e);
+ }
+ result.removableFiles.add(n.reader.getFile());
+ n = n.next;
}
- result.removableFiles.add(oldReader.getFile());
}
return result;
}
void collectBlobReferences(ReferenceCollector collector, int
minGeneration) throws IOException {
+ Node n;
+
lock.writeLock().lock();
- lock.readLock().lock();
try {
- try {
- checkOpen();
- if (!readOnly) {
- newWriter();
- }
- } finally {
- lock.writeLock().unlock();
- }
-
- // At this point the write lock is downgraded to a read lock for
- // better concurrency. It is always necessary to access TarReader
- // and TarWriter instances while holding a lock (either in read or
- // write mode) to prevent a concurrent #close(). In this case, we
- // don't need an exclusive access to the TarReader instances.
-
- for (TarReader reader : readers) {
- reader.collectBlobReferences(collector, minGeneration);
+ if (writer != null) {
+ newWriter();
}
+ n = readers;
} finally {
- lock.readLock().unlock();
+ lock.writeLock().unlock();
+ }
+
+ while (n != null) {
+ n.reader.collectBlobReferences(collector, minGeneration);
+ n = n.next;
}
}
Iterable<UUID> getSegmentIds() {
+ Node n;
+
lock.readLock().lock();
try {
- checkOpen();
- List<UUID> ids = new ArrayList<>();
- for (TarReader reader : readers) {
- ids.addAll(reader.getUUIDs());
- }
- return ids;
+ n = readers;
} finally {
lock.readLock().unlock();
}
+
+ List<UUID> ids = new ArrayList<>();
+ while (n != null) {
+ ids.addAll(n.reader.getUUIDs());
+ n = n.next;
+ }
+ return ids;
}
Map<UUID, List<UUID>> getGraph(String fileName) throws IOException {
- Set<UUID> index = null;
- Map<UUID, List<UUID>> graph = null;
+ Node n;
lock.readLock().lock();
try {
- checkOpen();
- for (TarReader reader : readers) {
- if (fileName.equals(reader.getFile().getName())) {
- index = reader.getUUIDs();
- graph = reader.getGraph(false);
- break;
- }
- }
+ n = readers;
} finally {
lock.readLock().unlock();
}
+ Set<UUID> index = null;
+ Map<UUID, List<UUID>> graph = null;
+
+ while (n != null) {
+ TarReader r = n.reader;
+ if (fileName.equals(r.getFile().getName())) {
+ index = r.getUUIDs();
+ graph = r.getGraph(false);
+ break;
+ }
+ n = n.next;
+ }
+
Map<UUID, List<UUID>> result = new HashMap<>();
if (index != null) {
for (UUID uuid : index) {
@@ -576,30 +642,38 @@ class TarFiles implements Closeable {
}
Map<String, Set<UUID>> getIndices() {
+ Node n;
+
lock.readLock().lock();
try {
- checkOpen();
- Map<String, Set<UUID>> index = new HashMap<>();
- for (TarReader reader : readers) {
- index.put(reader.getFile().getAbsolutePath(),
reader.getUUIDs());
- }
- return index;
+ n = readers;
} finally {
lock.readLock().unlock();
}
+
+ Map<String, Set<UUID>> index = new HashMap<>();
+ while (n != null) {
+ index.put(n.reader.getFile().getAbsolutePath(),
n.reader.getUUIDs());
+ n = n.next;
+ }
+ return index;
}
void traverseSegmentGraph(Set<UUID> roots, SegmentGraphVisitor visitor)
throws IOException {
+ Node n;
+
lock.readLock().lock();
try {
- checkOpen();
- includeForwardReferences(readers, roots);
- for (TarReader reader : readers) {
- reader.traverseSegmentGraph(roots, visitor);
- }
+ n = readers;
} finally {
lock.readLock().unlock();
}
+
+ includeForwardReferences(n, roots);
+ while (n != null) {
+ n.reader.traverseSegmentGraph(roots, visitor);
+ n = n.next;
+ }
}
}