Author: frm
Date: Tue Apr 4 09:49:13 2017
New Revision: 1790084
URL: http://svn.apache.org/viewvc?rev=1790084&view=rev
Log:
OAK-6002 - Encapsulate TAR files handling in its own class
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java
Tue Apr 4 09:49:13 2017
@@ -18,6 +18,7 @@
*/
package org.apache.jackrabbit.oak.segment.file;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Maps.newHashMap;
@@ -25,8 +26,12 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -34,6 +39,7 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import com.google.common.base.Supplier;
+import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.segment.CachingSegmentReader;
import org.apache.jackrabbit.oak.segment.RecordType;
@@ -46,6 +52,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.SegmentIdFactory;
import org.apache.jackrabbit.oak.segment.SegmentIdProvider;
import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.segment.SegmentReader;
import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.apache.jackrabbit.oak.segment.SegmentTracker;
@@ -76,11 +83,14 @@ public abstract class AbstractFileStore
*/
static final int CURRENT_STORE_VERSION = 1;
- private static final Pattern FILE_NAME_PATTERN =
- Pattern.compile("(data)((0|[1-9][0-9]*)[0-9]{4})([a-z])?.tar");
-
static final String FILE_NAME_FORMAT = "data%05d%s.tar";
+ protected static boolean notEmptyDirectory(File path) {
+ Collection<File> entries = FileUtils.listFiles(path, new String[]
{"tar"}, false);
+ checkArgument(entries != null, "{} is not a directory, or an I/O error
occurred", path);
+ return entries.size() > 0;
+ }
+
@Nonnull
final SegmentTracker tracker;
@@ -127,7 +137,14 @@ public abstract class AbstractFileStore
this.ioMonitor = builder.getIOMonitor();
}
- File getManifestFile() {
+ static SegmentNotFoundException
asSegmentNotFoundException(ExecutionException e, SegmentId id) {
+ if (e.getCause() instanceof SegmentNotFoundException) {
+ return (SegmentNotFoundException) e.getCause();
+ }
+ return new SegmentNotFoundException(id, e);
+ }
+
+ File getManifestFile() {
return new File(directory, MANIFEST_FILE_NAME);
}
@@ -182,35 +199,6 @@ public abstract class AbstractFileStore
return segmentReader.getTemplateCacheStats();
}
- static Map<Integer, Map<Character, File>> collectFiles(File directory) {
- Map<Integer, Map<Character, File>> dataFiles = newHashMap();
-
- for (File file : listFiles(directory)) {
- Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName());
- if (matcher.matches()) {
- Integer index = Integer.parseInt(matcher.group(2));
- Map<Character, File> files = dataFiles.get(index);
- if (files == null) {
- files = newHashMap();
- dataFiles.put(index, files);
- }
- Character generation = 'a';
- if (matcher.group(4) != null) {
- generation = matcher.group(4).charAt(0);
- }
- checkState(files.put(generation, file) == null);
- }
- }
-
- return dataFiles;
- }
-
- @Nonnull
- private static File[] listFiles(File directory) {
- File[] files = directory.listFiles();
- return files == null ? new File[] {} : files;
- }
-
@Nonnull
public abstract SegmentWriter getWriter();
@@ -285,6 +273,29 @@ public abstract class AbstractFileStore
});
}
+ static Set<UUID> readReferences(Segment segment) {
+ Set<UUID> references = new HashSet<>();
+ for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
+ references.add(segment.getReferencedSegmentId(i));
+ }
+ return references;
+ }
+
+ static Set<String> readBinaryReferences(final Segment segment) {
+ final Set<String> binaryReferences = new HashSet<>();
+ segment.forEachRecord(new RecordConsumer() {
+
+ @Override
+ public void consume(int number, RecordType type, int offset) {
+ if (type == RecordType.BLOB_ID) {
+ binaryReferences.add(SegmentBlob.readBlobId(segment,
number));
+ }
+ }
+
+ });
+ return binaryReferences;
+ }
+
static void closeAndLogOnFail(Closeable closeable) {
if (closeable != null) {
try {
@@ -296,4 +307,12 @@ public abstract class AbstractFileStore
}
}
+ Segment readSegmentUncached(TarFiles tarFiles, SegmentId id) {
+ ByteBuffer buffer = tarFiles.readSegment(id.getMostSignificantBits(),
id.getLeastSignificantBits());
+ if (buffer == null) {
+ throw new SegmentNotFoundException(id);
+ }
+ return new Segment(tracker, segmentReader, id, buffer);
+ }
+
}
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
Tue Apr 4 09:49:13 2017
@@ -18,10 +18,6 @@
*/
package org.apache.jackrabbit.oak.segment.file;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static com.google.common.collect.Lists.newLinkedList;
-import static com.google.common.collect.Maps.newLinkedHashMap;
import static com.google.common.collect.Sets.newHashSet;
import static java.lang.Integer.getInteger;
import static java.lang.String.format;
@@ -48,10 +44,7 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
-import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -59,8 +52,6 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
@@ -72,7 +63,6 @@ import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
import org.apache.jackrabbit.oak.segment.Compactor;
@@ -86,6 +76,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.WriterCacheManager;
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.segment.file.GCJournal.GCJournalEntry;
+import org.apache.jackrabbit.oak.segment.file.TarFiles.CleanupResult;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
@@ -95,6 +86,7 @@ import org.slf4j.LoggerFactory;
* The storage implementation for tar files.
*/
public class FileStore extends AbstractFileStore {
+
private static final Logger log = LoggerFactory.getLogger(FileStore.class);
/**
@@ -121,9 +113,7 @@ public class FileStore extends AbstractF
@Nonnull
private final GarbageCollector garbageCollector;
- private volatile List<TarReader> readers;
-
- private volatile TarWriter tarWriter;
+ private final TarFiles tarFiles;
private final RandomAccessFile lockFile;
@@ -160,8 +150,6 @@ public class FileStore extends AbstractF
*/
private volatile boolean shutdown;
- private final ReadWriteLock fileStoreLock = new ReentrantReadWriteLock();
-
private final FileStoreStats stats;
@Nonnull
@@ -192,30 +180,25 @@ public class FileStore extends AbstractF
this.garbageCollector = new GarbageCollector(
builder.getGcOptions(), builder.getGcListener(), new
GCJournal(directory), builder.getCacheManager());
- Map<Integer, Map<Character, File>> map = collectFiles(directory);
-
Manifest manifest = Manifest.empty();
- if (!map.isEmpty()) {
+ if (notEmptyDirectory(directory)) {
manifest = checkManifest(openManifest());
}
saveManifest(manifest);
- this.readers = newArrayListWithCapacity(map.size());
- Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
- Arrays.sort(indices);
- for (int i = indices.length - 1; i >= 0; i--) {
- readers.add(TarReader.open(map.get(indices[i]), memoryMapping,
recovery, ioMonitor));
- }
- this.stats = new FileStoreStats(builder.getStatsProvider(), this,
size());
-
- int writeNumber = 0;
- if (indices.length > 0) {
- writeNumber = indices[indices.length - 1] + 1;
- }
- this.tarWriter = new TarWriter(directory, stats, writeNumber,
ioMonitor);
-
+ this.stats = new FileStoreStats(builder.getStatsProvider(), this, 0);
+ this.tarFiles = TarFiles.builder()
+ .withDirectory(directory)
+ .withMemoryMapping(memoryMapping)
+ .withTarRecovery(recovery)
+ .withIOMonitor(ioMonitor)
+ .withFileStoreStats(stats)
+ .withMaxFileSize(maxFileSize)
+ .build();
+ this.stats.init(this.tarFiles.size());
+
this.snfeListener = builder.getSnfeListener();
fileStoreScheduler.scheduleAtFixedRate(
@@ -253,7 +236,7 @@ public class FileStore extends AbstractF
}
});
log.info("TarMK opened: {} (mmap={})", directory, memoryMapping);
- log.debug("TarMK readers {}", this.readers);
+ log.debug("TAR files: {}", tarFiles);
}
FileStore bind(TarRevisions revisions) throws IOException {
@@ -309,36 +292,14 @@ public class FileStore extends AbstractF
}
/**
- * @return the size of this store. This method shouldn't be called from
- * a very tight loop as it contents with the {@link #fileStoreLock}.
+ * @return the size of this store.
*/
private long size() {
- List<TarReader> readersSnapshot;
- long writeFileSnapshotSize;
-
- fileStoreLock.readLock().lock();
- try {
- readersSnapshot = ImmutableList.copyOf(readers);
- writeFileSnapshotSize = tarWriter != null ? tarWriter.fileLength()
: 0;
- } finally {
- fileStoreLock.readLock().unlock();
- }
-
- long size = writeFileSnapshotSize;
- for (TarReader reader : readersSnapshot) {
- size += reader.size();
- }
-
- return size;
+ return tarFiles.size();
}
public int readerCount(){
- fileStoreLock.readLock().lock();
- try {
- return readers.size();
- } finally {
- fileStoreLock.readLock().unlock();
- }
+ return tarFiles.readerCount();
}
public FileStoreStats getStats() {
@@ -353,9 +314,8 @@ public class FileStore extends AbstractF
@Override
public Void call() throws Exception {
segmentWriter.flush();
- tarWriter.flush();
+ tarFiles.flush();
stats.flushed();
-
return null;
}
});
@@ -453,27 +413,15 @@ public class FileStore extends AbstractF
Closer closer = Closer.create();
closer.register(revisions);
- fileStoreLock.writeLock().lock();
- try {
- if (lock != null) {
- try {
- lock.release();
- } catch (IOException e) {
- log.warn("Unable to release the file lock", e);
- }
- }
- closer.register(lockFile);
-
- List<TarReader> list = readers;
- readers = newArrayList();
- for (TarReader reader : list) {
- closer.register(reader);
+ if (lock != null) {
+ try {
+ lock.release();
+ } catch (IOException e) {
+ log.warn("Unable to release the file lock", e);
}
-
- closer.register(tarWriter);
- } finally {
- fileStoreLock.writeLock().unlock();
}
+ closer.register(lockFile);
+ closer.register(tarFiles);
closeAndLogOnFail(closer);
// Try removing pending files in case the scheduler didn't have a
chance to run yet
@@ -485,24 +433,7 @@ public class FileStore extends AbstractF
@Override
public boolean containsSegment(SegmentId id) {
- if (FileStoreUtil.containSegment(readers, id)) {
- return true;
- }
-
- if (tarWriter != null) {
- fileStoreLock.readLock().lock();
- try {
- if (tarWriter.containsEntry(id.getMostSignificantBits(),
id.getLeastSignificantBits())) {
- return true;
- }
- } finally {
- fileStoreLock.readLock().unlock();
- }
- }
-
- // the writer might have switched to a new file,
- // so we need to re-check the readers
- return FileStoreUtil.containSegment(readers, id);
+ return tarFiles.containsSegment(id.getMostSignificantBits(),
id.getLeastSignificantBits());
}
@Override
@@ -512,38 +443,7 @@ public class FileStore extends AbstractF
return segmentCache.getSegment(id, new Callable<Segment>() {
@Override
public Segment call() throws Exception {
- ByteBuffer buffer = FileStoreUtil.readEntry(readers, id);
- if (buffer != null) {
- return new Segment(tracker, segmentReader, id, buffer);
- }
-
- if (tarWriter != null) {
- fileStoreLock.readLock().lock();
- try {
- try {
- buffer =
tarWriter.readEntry(id.getMostSignificantBits(), id.getLeastSignificantBits());
- if (buffer != null) {
- return new Segment(tracker, segmentReader,
id, buffer);
- }
- } catch (IOException e) {
- log.warn("Failed to read from tar file {}",
tarWriter, e);
- }
- } finally {
- fileStoreLock.readLock().unlock();
- }
- }
-
- // The TarWriter might have become a TarReader in the
- // meantime. Moreover, the TarWriter that became a
TarReader
- // might have additional entries. Because of this, we need
- // to check the list of TarReaders once more.
-
- buffer = FileStoreUtil.readEntry(readers, id);
- if (buffer != null) {
- return new Segment(tracker, segmentReader, id, buffer);
- }
-
- throw new SegmentNotFoundException(id);
+ return readSegmentUncached(tarFiles, id);
}
});
} catch (ExecutionException e) {
@@ -553,13 +453,6 @@ public class FileStore extends AbstractF
}
}
- private static SegmentNotFoundException
asSegmentNotFoundException(ExecutionException e, SegmentId id) {
- if (e.getCause() instanceof SegmentNotFoundException) {
- return (SegmentNotFoundException) e.getCause();
- }
- return new SegmentNotFoundException(id, e);
- }
-
@Override
public void writeSegment(SegmentId id, byte[] buffer, int offset, int
length) throws IOException {
Segment segment = null;
@@ -569,6 +462,9 @@ public class FileStore extends AbstractF
// in an in-memory cache for later use.
int generation = 0;
+ Set<UUID> references = null;
+ Set<String> binaryReferences = null;
+
if (id.isDataSegmentId()) {
ByteBuffer data;
@@ -582,63 +478,26 @@ public class FileStore extends AbstractF
segment = new Segment(tracker, segmentReader, id, data);
generation = segment.getGcGeneration();
+ references = readReferences(segment);
+ binaryReferences = readBinaryReferences(segment);
}
- fileStoreLock.writeLock().lock();
- try {
- // Flush the segment to disk
-
- long size = tarWriter.writeEntry(
- id.getMostSignificantBits(),
- id.getLeastSignificantBits(),
- buffer,
- offset,
- length,
- generation
- );
-
- // If the segment is a data segment, update the graph before
- // (potentially) flushing the TAR file.
-
- if (segment != null) {
- populateTarGraph(segment, tarWriter);
- populateTarBinaryReferences(segment, tarWriter);
- }
-
- // Close the TAR file if the size exceeds the maximum.
-
- if (size >= maxFileSize) {
- newWriter();
- }
- } finally {
- fileStoreLock.writeLock().unlock();
- }
+ tarFiles.writeSegment(
+ id.asUUID(),
+ buffer,
+ offset,
+ length,
+ generation,
+ references,
+ binaryReferences
+ );
// Keep this data segment in memory as it's likely to be accessed soon.
-
if (segment != null) {
segmentCache.putSegment(segment);
}
}
- /**
- * Switch to a new tar writer.
- * This method may only be called when holding the write lock of {@link
#fileStoreLock}
- * @throws IOException
- */
- private void newWriter() throws IOException {
- TarWriter newWriter = tarWriter.createNextGeneration();
- if (newWriter != tarWriter) {
- File writeFile = tarWriter.getFile();
- List<TarReader> list =
- newArrayListWithCapacity(1 + readers.size());
- list.add(TarReader.open(writeFile, memoryMapping, ioMonitor));
- list.addAll(readers);
- readers = list;
- tarWriter = newWriter;
- }
- }
-
private void checkDiskSpace(SegmentGCOptions gcOptions) {
long repositoryDiskSpace = size();
long availableDiskSpace = directory.getFreeSpace();
@@ -943,99 +802,28 @@ public class FileStore extends AbstractF
throws IOException {
Stopwatch watch = Stopwatch.createStarted();
Set<UUID> bulkRefs = newHashSet();
- Map<TarReader, TarReader> cleaned = newLinkedHashMap();
- long initialSize = 0;
- fileStoreLock.writeLock().lock();
- try {
- gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT);
- gcListener.updateStatus(CLEANUP.message());
-
- newWriter();
- segmentCache.clear();
-
- // Suggest to the JVM that now would be a good time
- // to clear stale weak references in the SegmentTracker
- System.gc();
-
- for (SegmentId id : tracker.getReferencedSegmentIds()) {
- if (id.isBulkSegmentId()) {
- bulkRefs.add(id.asUUID());
- }
- }
-
- for (TarReader reader : readers) {
- cleaned.put(reader, reader);
- initialSize += reader.size();
- }
- } finally {
- fileStoreLock.writeLock().unlock();
+ gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT);
+ gcListener.updateStatus(CLEANUP.message());
+ segmentCache.clear();
+
+ // Suggest to the JVM that now would be a good time
+ // to clear stale weak references in the SegmentTracker
+ System.gc();
+
+ for (SegmentId id : tracker.getReferencedSegmentIds()) {
+ bulkRefs.add(id.asUUID());
+ }
+
+ CleanupResult cleanupResult = tarFiles.cleanup(bulkRefs,
compactionResult.reclaimer());
+ if (cleanupResult.isInterrupted()) {
+ gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
}
-
- gcListener.info("TarMK GC #{}: current repository size is {} ({}
bytes)",
- GC_COUNT, humanReadableByteCount(initialSize),
initialSize);
-
- Set<UUID> reclaim = newHashSet();
- for (TarReader reader : cleaned.keySet()) {
- reader.mark(bulkRefs, reclaim, compactionResult.reclaimer());
- log.info("{}: size of bulk references/reclaim set {}/{}",
- reader, bulkRefs.size(), reclaim.size());
- if (shutdown) {
- gcListener.info("TarMK GC #{}: cleanup interrupted",
GC_COUNT);
- break;
- }
- }
- Set<UUID> reclaimed = newHashSet();
- for (TarReader reader : cleaned.keySet()) {
- cleaned.put(reader, reader.sweep(reclaim, reclaimed));
- if (shutdown) {
- gcListener.info("TarMK GC #{}: cleanup interrupted",
GC_COUNT);
- break;
- }
- }
-
- // it doesn't account for concurrent commits that might have
happened
- long afterCleanupSize = 0;
-
- List<TarReader> oldReaders = newArrayList();
- fileStoreLock.writeLock().lock();
- try {
- // Replace current list of reader with the cleaned readers
taking care not to lose
- // any new reader that might have come in through concurrent
calls to newWriter()
- List<TarReader> sweptReaders = newArrayList();
- for (TarReader reader : readers) {
- if (cleaned.containsKey(reader)) {
- TarReader newReader = cleaned.get(reader);
- if (newReader != null) {
- sweptReaders.add(newReader);
- afterCleanupSize += newReader.size();
- }
- // if these two differ, the former represents the
swept version of the latter
- if (newReader != reader) {
- oldReaders.add(reader);
- }
- } else {
- sweptReaders.add(reader);
- }
- }
- readers = sweptReaders;
- } finally {
- fileStoreLock.writeLock().unlock();
- }
- tracker.clearSegmentIdTables(reclaimed, compactionResult.gcInfo());
-
- // Close old readers *after* setting readers to the new readers to
avoid accessing
- // a closed reader from readSegment()
- LinkedList<File> toRemove = newLinkedList();
- for (TarReader oldReader : oldReaders) {
- closeAndLogOnFail(oldReader);
- File file = oldReader.getFile();
- toRemove.addLast(file);
- }
- gcListener.info("TarMK GC #{}: cleanup marking files for deletion:
{}", GC_COUNT, toFileNames(toRemove));
+
tracker.clearSegmentIdTables(cleanupResult.getReclaimedSegmentIds(),
compactionResult.gcInfo());
+ gcListener.info("TarMK GC #{}: cleanup marking files for deletion:
{}", GC_COUNT, toFileNames(cleanupResult.getRemovableFiles()));
long finalSize = size();
- long reclaimedSize = initialSize - afterCleanupSize;
+ long reclaimedSize = cleanupResult.getReclaimedSize();
stats.reclaimed(reclaimedSize);
gcJournal.persist(reclaimedSize, finalSize, getGcGeneration(),
compactionMonitor.getCompactedNodes(),
@@ -1046,7 +834,7 @@ public class FileStore extends AbstractF
GC_COUNT, watch, watch.elapsed(MILLISECONDS),
humanReadableByteCount(finalSize), finalSize,
humanReadableByteCount(reclaimedSize), reclaimedSize);
- return toRemove;
+ return cleanupResult.getRemovableFiles();
}
private String toFileNames(@Nonnull List<File> files) {
@@ -1056,7 +844,13 @@ public class FileStore extends AbstractF
return Joiner.on(",").join(files);
}
}
-
+
+ private void collectBulkReferences(Set<UUID> bulkRefs) {
+ for (SegmentId id : tracker.getReferencedSegmentIds()) {
+ bulkRefs.add(id.asUUID());
+ }
+ }
+
/**
* Finds all external blob references that are currently accessible
* in this repository and adds them to the given collector. Useful
@@ -1071,19 +865,8 @@ public class FileStore extends AbstractF
*/
synchronized void collectBlobReferences(ReferenceCollector collector)
throws IOException {
segmentWriter.flush();
- List<TarReader> tarReaders = newArrayList();
- fileStoreLock.writeLock().lock();
- try {
- newWriter();
- tarReaders.addAll(FileStore.this.readers);
- } finally {
- fileStoreLock.writeLock().unlock();
- }
-
int minGeneration = getGcGeneration() -
gcOptions.getRetainedGenerations() + 1;
- for (TarReader tarReader : tarReaders) {
- tarReader.collectBlobReferences(collector, minGeneration);
- }
+ tarFiles.collectBlobReferences(collector, minGeneration);
}
void cancel() {
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreStats.java
Tue Apr 4 09:49:13 2017
@@ -51,6 +51,10 @@ public class FileStoreStats implements F
repoSize.inc(initialSize);
}
+ public void init(long initialSize) {
+ repoSize.inc(initialSize);
+ }
+
//~-----------------------------< FileStoreMonitor >
@Override
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreUtil.java
Tue Apr 4 09:49:13 2017
@@ -67,40 +67,50 @@ class FileStoreUtil {
return null;
}
+ static boolean containSegment(List<TarReader> readers, SegmentId id) {
+ return containSegment(readers, id.getMostSignificantBits(),
id.getLeastSignificantBits());
+ }
+
/**
* Check if a segment is contained in one of the provided TAR files.
*
* @param readers A list of {@link TarReader} instances.
- * @param id An instance of {@link SegmentId}.
+ * @param msb Most significant bits of the segment ID.
+ * @param lsb Least significant bits of the segment ID.
* @return {@code true} if the segment is contained in at least one of the
* provided TAR files, {@code false} otherwise.
*/
- static boolean containSegment(List<TarReader> readers, SegmentId id) {
+ static boolean containSegment(List<TarReader> readers, long msb, long lsb)
{
for (TarReader reader : readers) {
- if (reader.containsEntry(id.getMostSignificantBits(),
id.getLeastSignificantBits())) {
+ if (reader.containsEntry(msb, lsb)) {
return true;
}
}
return false;
}
+ static ByteBuffer readEntry(List<TarReader> readers, SegmentId id) {
+ return readEntry(readers, id.getMostSignificantBits(),
id.getLeastSignificantBits());
+ }
+
/**
* Read the entry corresponding to a segment from one of the provided TAR
* files.
*
* @param readers A list of {@link TarReader} instances.
- * @param id An instance of {@link SegmentId}.
+ * @param msb Most significant bits of the segment ID.
+ * @param lsb Least significant bits of the segment ID.
* @return An instance of {@link ByteBuffer} if the entry for the segment
* could be found, {@code null} otherwise.
*/
- static ByteBuffer readEntry(List<TarReader> readers, SegmentId id) {
+ static ByteBuffer readEntry(List<TarReader> readers, long msb, long lsb) {
for (TarReader reader : readers) {
if (reader.isClosed()) {
log.debug("Skipping closed tar file {}", reader);
continue;
}
try {
- ByteBuffer buffer =
reader.readEntry(id.getMostSignificantBits(), id.getLeastSignificantBits());
+ ByteBuffer buffer = reader.readEntry(msb, lsb);
if (buffer != null) {
return buffer;
}
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java?rev=1790084&r1=1790083&r2=1790084&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java
Tue Apr 4 09:49:13 2017
@@ -18,19 +18,10 @@
*/
package org.apache.jackrabbit.oak.segment.file;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Sets.newHashSet;
-import static java.util.Collections.emptyMap;
import static
org.apache.jackrabbit.oak.segment.SegmentWriterBuilder.segmentWriterBuilder;
-import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,7 +36,6 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.Segment;
import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.segment.SegmentWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +51,7 @@ public class ReadOnlyFileStore extends A
private static final Logger log = LoggerFactory
.getLogger(ReadOnlyFileStore.class);
- private final List<TarReader> readers;
+ private final TarFiles tarFiles;
@Nonnull
private final SegmentWriter writer;
@@ -73,21 +63,17 @@ public class ReadOnlyFileStore extends A
ReadOnlyFileStore(FileStoreBuilder builder) throws
InvalidFileStoreVersionException, IOException {
super(builder);
- Map<Integer, Map<Character, File>> map = collectFiles(directory);
-
- if (!map.isEmpty()) {
+ if (notEmptyDirectory(directory)) {
checkManifest(openManifest());
}
- this.readers = newArrayListWithCapacity(map.size());
- Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
- Arrays.sort(indices);
- for (int i = indices.length - 1; i >= 0; i--) {
- // only try to read-only recover the latest file as that might
- // be the *only* one still being accessed by a writer
- boolean recover = i == indices.length - 1;
- readers.add(TarReader.openRO(map.get(indices[i]), memoryMapping,
recover, recovery, ioMonitor));
- }
+ tarFiles = TarFiles.builder()
+ .withDirectory(directory)
+ .withTarRecovery(recovery)
+ .withIOMonitor(ioMonitor)
+ .withMemoryMapping(memoryMapping)
+ .withReadOnly()
+ .build();
writer = segmentWriterBuilder("read-only").withoutCache().build(this);
log.info("TarMK ReadOnly opened: {} (mmap={})", directory,
@@ -114,25 +100,6 @@ public class ReadOnlyFileStore extends A
}
/**
- * Include the ids of all segments transitively reachable through forward
- * references from {@code referencedIds}. See OAK-3864.
- */
- private static void includeForwardReferences(Iterable<TarReader> readers,
- Set<UUID> referencedIds) throws IOException {
- Set<UUID> fRefs = newHashSet(referencedIds);
- do {
- // Add direct forward references
- for (TarReader reader : readers) {
- reader.calculateForwardReferences(fRefs);
- if (fRefs.isEmpty()) {
- break; // Optimisation: bail out if no references left
- }
- }
- // ... as long as new forward references are found.
- } while (referencedIds.addAll(fRefs));
- }
-
- /**
* Build the graph of segments reachable from an initial set of segments
*
* @param roots
@@ -141,15 +108,8 @@ public class ReadOnlyFileStore extends A
* visitor receiving call back while following the segment graph
* @throws IOException
*/
- public void traverseSegmentGraph(@Nonnull Set<UUID> roots,
- @Nonnull SegmentGraphVisitor visitor) throws IOException {
-
- List<TarReader> readers = this.readers;
- includeForwardReferences(readers, roots);
- for (TarReader reader : readers) {
- reader.traverseSegmentGraph(checkNotNull(roots),
- checkNotNull(visitor));
- }
+ public void traverseSegmentGraph(@Nonnull Set<UUID> roots, @Nonnull
SegmentGraphVisitor visitor) throws IOException {
+ tarFiles.traverseSegmentGraph(roots, visitor);
}
@Override
@@ -159,7 +119,7 @@ public class ReadOnlyFileStore extends A
@Override
public boolean containsSegment(SegmentId id) {
- return FileStoreUtil.containSegment(readers, id);
+ return tarFiles.containsSegment(id.getMostSignificantBits(),
id.getLeastSignificantBits());
}
@Override
@@ -169,26 +129,18 @@ public class ReadOnlyFileStore extends A
return segmentCache.getSegment(id, new Callable<Segment>() {
@Override
public Segment call() throws Exception {
- ByteBuffer buffer = FileStoreUtil.readEntry(readers, id);
- if (buffer == null) {
- throw new SegmentNotFoundException(id);
- }
- return new Segment(tracker, segmentReader, id, buffer);
+ return readSegmentUncached(tarFiles, id);
}
});
} catch (ExecutionException e) {
- throw e.getCause() instanceof SegmentNotFoundException
- ? (SegmentNotFoundException) e.getCause()
- : new SegmentNotFoundException(id, e);
+ throw asSegmentNotFoundException(e, id);
}
}
@Override
public void close() {
Closer closer = Closer.create();
- for (TarReader r : readers) {
- closer.register(r);
- }
+ closer.register(tarFiles);
closer.register(revisions);
closeAndLogOnFail(closer);
System.gc(); // for any memory-mappings that are no longer used
@@ -202,39 +154,19 @@ public class ReadOnlyFileStore extends A
}
public Map<String, Set<UUID>> getTarReaderIndex() {
- Map<String, Set<UUID>> index = new HashMap<String, Set<UUID>>();
- for (TarReader reader : readers) {
- index.put(reader.getFile().getAbsolutePath(), reader.getUUIDs());
- }
- return index;
+ return tarFiles.getIndices();
}
- public Map<UUID, List<UUID>> getTarGraph(String fileName)
- throws IOException {
- for (TarReader reader : readers) {
- if (fileName.equals(reader.getFile().getName())) {
- Map<UUID, List<UUID>> graph = newHashMap();
- for (UUID uuid : reader.getUUIDs()) {
- graph.put(uuid, null);
- }
- Map<UUID, List<UUID>> g = reader.getGraph(false);
- if (g != null) {
- graph.putAll(g);
- }
- return graph;
- }
- }
- return emptyMap();
+ public Map<UUID, List<UUID>> getTarGraph(String fileName) throws
IOException {
+ return tarFiles.getGraph(fileName);
}
public Iterable<SegmentId> getSegmentIds() {
- List<SegmentId> ids = newArrayList();
- for (TarReader reader : readers) {
- for (UUID uuid : reader.getUUIDs()) {
- long msb = uuid.getMostSignificantBits();
- long lsb = uuid.getLeastSignificantBits();
- ids.add(tracker.newSegmentId(msb, lsb));
- }
+ List<SegmentId> ids = new ArrayList<>();
+ for (UUID id : tarFiles.getSegmentIds()) {
+ long msb = id.getMostSignificantBits();
+ long lsb = id.getLeastSignificantBits();
+ ids.add(tracker.newSegmentId(msb, lsb));
}
return ids;
}
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java?rev=1790084&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
Tue Apr 4 09:49:13 2017
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.newArrayListWithCapacity;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Maps.newLinkedHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+import static org.apache.commons.io.FileUtils.listFiles;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Predicate;
+import com.google.common.io.Closer;
+import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TarFiles implements Closeable {
+
+ static class CleanupResult {
+
+ private boolean interrupted;
+
+ private long reclaimedSize;
+
+ private List<File> removableFiles;
+
+ private Set<UUID> reclaimedSegmentIds;
+
+ private CleanupResult() {
+ // Prevent external instantiation.
+ }
+
+ long getReclaimedSize() {
+ return reclaimedSize;
+ }
+
+ List<File> getRemovableFiles() {
+ return removableFiles;
+ }
+
+ Set<UUID> getReclaimedSegmentIds() {
+ return reclaimedSegmentIds;
+ }
+
+ boolean isInterrupted() {
+ return interrupted;
+ }
+
+ }
+
+ static class Builder {
+
+ private File directory;
+
+ private boolean memoryMapping;
+
+ private TarRecovery tarRecovery;
+
+ private IOMonitor ioMonitor;
+
+ private FileStoreStats fileStoreStats;
+
+ private long maxFileSize;
+
+ private boolean readOnly;
+
+ Builder withDirectory(File directory) {
+ this.directory = checkNotNull(directory);
+ return this;
+ }
+
+ Builder withMemoryMapping(boolean memoryMapping) {
+ this.memoryMapping = memoryMapping;
+ return this;
+ }
+
+ Builder withTarRecovery(TarRecovery tarRecovery) {
+ this.tarRecovery = checkNotNull(tarRecovery);
+ return this;
+ }
+
+ Builder withIOMonitor(IOMonitor ioMonitor) {
+ this.ioMonitor = checkNotNull(ioMonitor);
+ return this;
+ }
+
+ Builder withFileStoreStats(FileStoreStats fileStoreStats) {
+ this.fileStoreStats = checkNotNull(fileStoreStats);
+ return this;
+ }
+
+ Builder withMaxFileSize(long maxFileSize) {
+ checkArgument(maxFileSize > 0);
+ this.maxFileSize = maxFileSize;
+ return this;
+ }
+
+ Builder withReadOnly() {
+ this.readOnly = true;
+ return this;
+ }
+
+ public TarFiles build() throws IOException {
+ checkState(directory != null, "Directory not specified");
+ checkState(tarRecovery != null, "TAR recovery strategy not
specified");
+ checkState(ioMonitor != null, "I/O monitor not specified");
+ checkState(readOnly || fileStoreStats != null, "File store
statistics not specified");
+ checkState(readOnly || maxFileSize != 0, "Max file size not
specified");
+ return new TarFiles(this);
+ }
+
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(TarFiles.class);
+
+ private static final Pattern FILE_NAME_PATTERN =
Pattern.compile("(data)((0|[1-9][0-9]*)[0-9]{4})([a-z])?.tar");
+
+ private static Map<Integer, Map<Character, File>> collectFiles(File
directory) {
+ Map<Integer, Map<Character, File>> dataFiles = newHashMap();
+ for (File file : listFiles(directory, null, false)) {
+ Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName());
+ if (matcher.matches()) {
+ Integer index = Integer.parseInt(matcher.group(2));
+ Map<Character, File> files = dataFiles.get(index);
+ if (files == null) {
+ files = newHashMap();
+ dataFiles.put(index, files);
+ }
+ Character generation = 'a';
+ if (matcher.group(4) != null) {
+ generation = matcher.group(4).charAt(0);
+ }
+ checkState(files.put(generation, file) == null);
+ }
+ }
+ return dataFiles;
+ }
+
+ /**
+ * Include the ids of all segments transitively reachable through forward
+ * references from {@code referencedIds}. See OAK-3864.
+ */
+ private static void includeForwardReferences(Iterable<TarReader> readers,
Set<UUID> referencedIds) throws IOException {
+ Set<UUID> fRefs = newHashSet(referencedIds);
+ do {
+ // Add direct forward references
+ for (TarReader reader : readers) {
+ reader.calculateForwardReferences(fRefs);
+ if (fRefs.isEmpty()) {
+ break; // Optimisation: bail out if no references left
+ }
+ }
+ // ... as long as new forward references are found.
+ } while (referencedIds.addAll(fRefs));
+ }
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private final long maxFileSize;
+
+ private final boolean memoryMapping;
+
+ private final IOMonitor ioMonitor;
+
+ private final boolean readOnly;
+
+ private List<TarReader> readers;
+
+ private TarWriter writer;
+
+ private volatile boolean shutdown;
+
+ private boolean closed;
+
+ private TarFiles(Builder builder) throws IOException {
+ maxFileSize = builder.maxFileSize;
+ memoryMapping = builder.memoryMapping;
+ ioMonitor = builder.ioMonitor;
+ readOnly = builder.readOnly;
+ Map<Integer, Map<Character, File>> map =
collectFiles(builder.directory);
+ readers = newArrayListWithCapacity(map.size());
+ Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
+ Arrays.sort(indices);
+ for (int i = indices.length - 1; i >= 0; i--) {
+ if (readOnly) {
+ readers.add(TarReader.openRO(map.get(indices[i]),
memoryMapping, true, builder.tarRecovery, ioMonitor));
+ } else {
+ readers.add(TarReader.open(map.get(indices[i]), memoryMapping,
builder.tarRecovery, ioMonitor));
+ }
+ }
+ if (!readOnly) {
+ int writeNumber = 0;
+ if (indices.length > 0) {
+ writeNumber = indices[indices.length - 1] + 1;
+ }
+ writer = new TarWriter(builder.directory, builder.fileStoreStats,
writeNumber, builder.ioMonitor);
+ }
+ }
+
+ private void checkOpen() {
+ checkState(!closed, "This instance has been closed");
+ }
+
+ private void checkReadWrite() {
+ checkState(!readOnly, "This instance is read-only");
+ }
+
+ @Override
+ public void close() throws IOException {
+ shutdown = true;
+ lock.writeLock().lock();
+ try {
+ checkOpen();
+ closed = true;
+ Closer closer = Closer.create();
+ closer.register(writer);
+ writer = null;
+ for (TarReader reader : readers) {
+ closer.register(reader);
+ }
+ readers = null;
+ closer.close();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ lock.readLock().lock();
+ try {
+ return "TarFiles{readers=" + readers + ", writer=" + writer + "}";
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ long size() {
+ lock.readLock().lock();
+ try {
+ checkOpen();
+ long size = 0;
+ if (!readOnly) {
+ size = writer.fileLength();
+ }
+ for (TarReader reader : readers) {
+ size += reader.size();
+ }
+ return size;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ int readerCount() {
+ lock.readLock().lock();
+ try {
+ checkOpen();
+ return readers.size();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ void flush() throws IOException {
+ checkReadWrite();
+ lock.readLock().lock();
+ try {
+ checkOpen();
+ writer.flush();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ boolean containsSegment(long msb, long lsb) {
+ lock.readLock().lock();
+ try {
+ checkOpen();
+ if (!readOnly) {
+ if (writer.containsEntry(msb, lsb)) {
+ return true;
+ }
+ }
+ for (TarReader reader : readers) {
+ if (reader.containsEntry(msb, lsb)) {
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ ByteBuffer readSegment(long msb, long lsb) {
+ lock.readLock().lock();
+ try {
+ checkOpen();
+ try {
+ if (!readOnly) {
+ ByteBuffer buffer = writer.readEntry(msb, lsb);
+ if (buffer != null) {
+ return buffer;
+ }
+ }
+ for (TarReader reader : readers) {
+ ByteBuffer buffer = reader.readEntry(msb, lsb);
+ if (buffer != null) {
+ return buffer;
+ }
+ }
+ } catch (IOException e) {
+ log.warn("Unable to read from TAR file {}", writer, e);
+ }
+ return null;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ void writeSegment(UUID id, byte[] buffer, int offset, int length, int
generation, Set<UUID> references, Set<String> binaryReferences) throws
IOException {
+ checkReadWrite();
+ lock.writeLock().lock();
+ try {
+ checkOpen();
+ long size = writer.writeEntry(
+ id.getMostSignificantBits(),
+ id.getLeastSignificantBits(),
+ buffer,
+ offset,
+ length,
+ generation
+ );
+ if (references != null) {
+ for (UUID reference : references) {
+ writer.addGraphEdge(id, reference);
+ }
+ }
+ if (binaryReferences != null) {
+ for (String reference : binaryReferences) {
+ writer.addBinaryReference(generation, id, reference);
+ }
+ }
+ if (size >= maxFileSize) {
+ newWriter();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void newWriter() throws IOException {
+ TarWriter newWriter = writer.createNextGeneration();
+ if (newWriter == writer) {
+ return;
+ }
+ File writeFile = writer.getFile();
+ List<TarReader> list = newArrayListWithCapacity(1 + readers.size());
+ list.add(TarReader.open(writeFile, memoryMapping, ioMonitor));
+ list.addAll(readers);
+ readers = list;
+ writer = newWriter;
+ }
+
+ CleanupResult cleanup(Set<UUID> references, Predicate<Integer>
reclaimGeneration) throws IOException {
+ checkReadWrite();
+
+ CleanupResult result = new CleanupResult();
+ result.removableFiles = new ArrayList<>();
+ result.reclaimedSegmentIds = new HashSet<>();
+
+ Map<TarReader, TarReader> cleaned = newLinkedHashMap();
+
+ lock.writeLock().lock();
+ lock.readLock().lock();
+ try {
+ try {
+ checkOpen();
+ newWriter();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ // At this point the write lock is downgraded to a read lock for
+ // better concurrency. It is always necessary to access TarReader
+ // and TarWriter instances while holding a lock (either in read or
+ // write mode) to prevent a concurrent #close(). In this case, we
+ // don't need an exclusive access to the TarReader instances.
+
+ // TODO now that the two protected sections have been merged thanks
+ // to lock downgrading, check if the following code can be
+ // simplified.
+
+ for (TarReader reader : readers) {
+ cleaned.put(reader, reader);
+ result.reclaimedSize += reader.size();
+ }
+ Set<UUID> reclaim = newHashSet();
+ for (TarReader reader : cleaned.keySet()) {
+ if (shutdown) {
+ result.interrupted = true;
+ return result;
+ }
+ reader.mark(references, reclaim, reclaimGeneration);
+ log.info("{}: size of bulk references/reclaim set {}/{}",
reader, references.size(), reclaim.size());
+ }
+ for (TarReader reader : cleaned.keySet()) {
+ if (shutdown) {
+ result.interrupted = true;
+ return result;
+ }
+ cleaned.put(reader, reader.sweep(reclaim,
result.reclaimedSegmentIds));
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ List<TarReader> oldReaders = newArrayList();
+ lock.writeLock().lock();
+ try {
+ // Replace current list of reader with the cleaned readers taking
care not to lose
+ // any new reader that might have come in through concurrent calls
to newWriter()
+ checkOpen();
+ List<TarReader> sweptReaders = newArrayList();
+ for (TarReader reader : readers) {
+ if (cleaned.containsKey(reader)) {
+ TarReader newReader = cleaned.get(reader);
+ if (newReader != null) {
+ sweptReaders.add(newReader);
+ result.reclaimedSize -= newReader.size();
+ }
+ // if these two differ, the former represents the swept
version of the latter
+ if (newReader != reader) {
+ oldReaders.add(reader);
+ }
+ } else {
+ sweptReaders.add(reader);
+ }
+ }
+ readers = sweptReaders;
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ for (TarReader oldReader : oldReaders) {
+ try {
+ oldReader.close();
+ } catch (IOException e) {
+ log.error("Unable to close swept TAR reader", e);
+ }
+ result.removableFiles.add(oldReader.getFile());
+ }
+
+ return result;
+ }
+
+ void collectBlobReferences(ReferenceCollector collector, int
minGeneration) throws IOException {
+ lock.writeLock().lock();
+ lock.readLock().lock();
+ try {
+ try {
+ checkOpen();
+ if (!readOnly) {
+ newWriter();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ // At this point the write lock is downgraded to a read lock for
+ // better concurrency. It is always necessary to access TarReader
+ // and TarWriter instances while holding a lock (either in read or
+ // write mode) to prevent a concurrent #close(). In this case, we
+ // don't need an exclusive access to the TarReader instances.
+
+ for (TarReader reader : readers) {
+ reader.collectBlobReferences(collector, minGeneration);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ Iterable<UUID> getSegmentIds() {
+ lock.readLock().lock();
+ try {
+ checkOpen();
+ List<UUID> ids = new ArrayList<>();
+ for (TarReader reader : readers) {
+ ids.addAll(reader.getUUIDs());
+ }
+ return ids;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ Map<UUID, List<UUID>> getGraph(String fileName) throws IOException {
+ Set<UUID> index = null;
+ Map<UUID, List<UUID>> graph = null;
+
+ lock.readLock().lock();
+ try {
+ checkOpen();
+ for (TarReader reader : readers) {
+ if (fileName.equals(reader.getFile().getName())) {
+ index = reader.getUUIDs();
+ graph = reader.getGraph(false);
+ break;
+ }
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ Map<UUID, List<UUID>> result = new HashMap<>();
+ if (index != null) {
+ for (UUID uuid : index) {
+ result.put(uuid, null);
+ }
+ }
+ if (graph != null) {
+ result.putAll(graph);
+ }
+ return result;
+ }
+
+ Map<String, Set<UUID>> getIndices() {
+ lock.readLock().lock();
+ try {
+ checkOpen();
+ Map<String, Set<UUID>> index = new HashMap<>();
+ for (TarReader reader : readers) {
+ index.put(reader.getFile().getAbsolutePath(),
reader.getUUIDs());
+ }
+ return index;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ void traverseSegmentGraph(Set<UUID> roots, SegmentGraphVisitor visitor)
throws IOException {
+ lock.readLock().lock();
+ try {
+ checkOpen();
+ includeForwardReferences(readers, roots);
+ for (TarReader reader : readers) {
+ reader.traverseSegmentGraph(roots, visitor);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarFiles.java
------------------------------------------------------------------------------
svn:eol-style = native